浏览代码

Create JDBC Source and Sink connectors using docker-compose file (kafka-ui-connector.yaml) (#709)

Ildar Almakaev 3 年之前
父节点
当前提交
3cec4a1d6f

+ 19 - 0
docker/connectors/sink-activities.json

@@ -0,0 +1,19 @@
+{
+  "name": "sink_postgres_activities",
+  "config": {
+    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
+    "connection.url": "jdbc:postgresql://postgres-db:5432/test",
+    "connection.user": "dev_user",
+    "connection.password": "12345",
+    "topics": "source-activities",
+    "table.name.format": "sink_activities",
+    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+    "key.converter.schema.registry.url": "http://schemaregistry0:8085",
+    "value.converter": "io.confluent.connect.avro.AvroConverter",
+    "value.converter.schema.registry.url": "http://schemaregistry0:8085",
+    "auto.create": "true",
+    "pk.mode": "record_value",
+    "pk.fields": "id",
+    "insert.mode": "upsert"
+  }
+}

+ 20 - 0
docker/connectors/source-activities.json

@@ -0,0 +1,20 @@
+{
+  "name": "source_postgres_activities",
+  "config": {
+    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
+    "connection.url": "jdbc:postgresql://postgres-db:5432/test",
+    "connection.user": "dev_user",
+    "connection.password": "12345",
+    "topic.prefix": "source-",
+    "poll.interval.ms": 3600000,
+    "table.whitelist": "public.activities",
+    "mode": "bulk",
+    "transforms": "extractkey",
+    "transforms.extractkey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
+    "transforms.extractkey.field": "id",
+    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+    "key.converter.schema.registry.url": "http://schemaregistry0:8085",
+    "value.converter": "io.confluent.connect.avro.AvroConverter",
+    "value.converter.schema.registry.url": "http://schemaregistry0:8085"
+  }
+}

+ 9 - 0
docker/connectors/start.sh

@@ -0,0 +1,9 @@
+#! /bin/bash
+while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' kafka-connect0:8083)" != "200" ]]
+    do sleep 5
+done
+
+echo "\n --------------Creating connectors..."
+for filename in /connectors/*.json; do
+  curl -X POST -H "Content-Type: application/json" -d @$filename http://kafka-connect0:8083/connectors
+done

+ 173 - 0
docker/kafka-ui-connectors.yaml

@@ -0,0 +1,173 @@
+---
+version: '2'
+services:
+
+  kafka-ui:
+    container_name: kafka-ui
+    image: provectuslabs/kafka-ui:master
+    ports:
+      - 8080:8080
+    depends_on:
+      - zookeeper0
+      - zookeeper1
+      - kafka0
+      - kafka1
+      - schemaregistry0
+      - kafka-connect0
+    environment:
+      KAFKA_CLUSTERS_0_NAME: local
+      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
+      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
+      KAFKA_CLUSTERS_0_JMXPORT: 9997
+      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
+      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
+      KAFKA_CLUSTERS_1_NAME: secondLocal
+      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
+      KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
+      KAFKA_CLUSTERS_1_JMXPORT: 9998
+      KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
+      KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
+      KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
+
+  zookeeper0:
+    image: confluentinc/cp-zookeeper:5.2.4
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+    ports:
+      - 2181:2181
+
+  kafka0:
+    image: confluentinc/cp-kafka:5.2.4
+    depends_on:
+      - zookeeper0
+    ports:
+      - 9092:9092
+      - 9997:9997
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      JMX_PORT: 9997
+      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
+
+  zookeeper1:
+    image: confluentinc/cp-zookeeper:5.2.4
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+
+  kafka1:
+    image: confluentinc/cp-kafka:5.2.4
+    depends_on:
+      - zookeeper1
+    ports:
+      - 9093:9093
+      - 9998:9998
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9093
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      JMX_PORT: 9998
+      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998
+
+  schemaregistry0:
+    image: confluentinc/cp-schema-registry:5.2.4
+    ports:
+      - 8085:8085
+    depends_on:
+      - zookeeper0
+      - kafka0
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+
+  schemaregistry1:
+    image: confluentinc/cp-schema-registry:5.5.0
+    ports:
+      - 18085:8085
+    depends_on:
+      - zookeeper1
+      - kafka1
+    environment:
+      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
+      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper1:2181
+      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+      SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
+      SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085
+
+      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+
+  kafka-connect0:
+    image: confluentinc/cp-kafka-connect:6.0.1
+    ports:
+      - 8083:8083
+    depends_on:
+      - kafka0
+      - schemaregistry0
+    environment:
+      CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+      CONNECT_GROUP_ID: compose-connect-group
+      CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+
+  kafka-init-topics:
+    image: confluentinc/cp-kafka:5.2.4
+    volumes:
+      - ./message.json:/data/message.json
+    depends_on:
+      - kafka1
+    command: "bash -c 'echo Waiting for Kafka to be ready... && \
+               cub kafka-ready -b kafka1:29092 1 30 && \
+               kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+               kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+               kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
+               kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"
+
+  postgres-db:
+    build:
+      context: ./postgres
+      args:
+        image: postgres:9.6.22
+    ports:
+      - 5432:5432
+    environment:
+      POSTGRES_USER: 'dev_user'
+      POSTGRES_PASSWORD: '12345'
+
+  create-connectors:
+    image: tutum/curl
+    depends_on:
+      - postgres-db
+      - kafka-connect0
+    volumes:
+      - ./connectors:/connectors
+    command: bash -c '/connectors/start.sh'

+ 9 - 0
docker/postgres/Dockerfile

@@ -0,0 +1,9 @@
+ARG image
+
+FROM ${image}
+
+MAINTAINER Provectus Team
+
+ADD data.sql /docker-entrypoint-initdb.d
+
+EXPOSE 5432

+ 24 - 0
docker/postgres/data.sql

@@ -0,0 +1,24 @@
+CREATE DATABASE test WITH OWNER = dev_user;
+\connect test
+
+CREATE TABLE activities
+(
+    id        INTEGER PRIMARY KEY,
+    msg       varchar(24),
+    action    varchar(128),
+    browser   varchar(24),
+    device    json,
+    createdAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
+);
+
+insert into activities(id, action, msg, browser, device)
+values (1, 'LOGIN', 'Success', 'Chrome', '{
+  "name": "Chrome",
+  "major": "67",
+  "version": "67.0.3396.99"
+}'),
+       (2, 'LOGIN', 'Failed', 'Apple WebKit', '{
+         "name": "WebKit",
+         "major": "605",
+         "version": "605.1.15"
+       }');