diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 735fd6325b658ac399e9ea0376b9f123b368a5d8..ae5757ce6f30e2a58f8575b8862d0b6cd0a5d5eb 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -20,7 +20,7 @@ jobs:
- name: Set the values
id: step_one
run: |
- cat "./kafka-ui-e2e-checks/.env.example" >> "./kafka-ui-e2e-checks/.env"
+ cat "./kafka-ui-e2e-checks/.env.ci" >> "./kafka-ui-e2e-checks/.env"
- name: pull docker
id: step_four
run: |
diff --git a/.github/workflows/pr-checks.yaml b/.github/workflows/pr-checks.yaml
index 7c156ff7c2e21ecbf1f86a6e824954b45eb1c0f9..48b47c5196f84b72b7114f39ae0999e0a6527086 100644
--- a/.github/workflows/pr-checks.yaml
+++ b/.github/workflows/pr-checks.yaml
@@ -10,6 +10,6 @@ jobs:
- uses: kentaro-m/task-completed-checker-action@v0.1.0
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
- - uses: derkinderfietsen/pr-description-enforcer@v1
+ - uses: dekinderfiets/pr-description-enforcer@0.0.1
with:
- repo-token: '${{ secrets.GITHUB_TOKEN }}'
\ No newline at end of file
+ repo-token: '${{ secrets.GITHUB_TOKEN }}'
diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml
index 9e9efc7dff3e85cbff99d640b196b5e1382538b2..15f7d7dd6d0e46f02743d804b43d3e1eb15dae01 100644
--- a/.github/workflows/release.yaml
+++ b/.github/workflows/release.yaml
@@ -1,14 +1,29 @@
name: release
on:
workflow_dispatch:
+ inputs:
+ customVersion:
+ description: 'A new version for release, please provide with -SNAPSHOT suffix'
+ required: false
+ default: '0.0.0'
+ rebuild:
+ description: 'A tag name for building previously created release'
+ required: false
+ default: 'v0.0.0'
+ extraMavenOptions:
+ description: 'A extra options for Maven'
+ required: false
+ default: ''
jobs:
release:
runs-on: ubuntu-latest
outputs:
- version: ${{steps.prep.outputs.version}}
+ version: ${{steps.build.outputs.version}}
steps:
- uses: actions/checkout@v2
+ with:
+ fetch-depth: 0
- run: |
git config user.name github-actions
git config user.email github-actions@github.com
@@ -19,12 +34,22 @@ jobs:
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- - uses: actions/checkout@v2
- name: Set up JDK 1.13
uses: actions/setup-java@v1
with:
java-version: 1.13
+ - name: Checkout to specific tag
+ if: github.event.inputs.rebuild != 'v0.0.0'
+ run: |
+ git checkout tags/${{ github.event.inputs.rebuild }} -b rebuild-${{ github.event.inputs.rebuild }}
+ - name: Set custom version
+ if: github.event.inputs.customVersion != '0.0.0' && github.event.inputs.rebuild == 'v0.0.0'
+ run: |
+ mvn -q versions:set -DnewVersion=${{ github.event.inputs.customVersion }}
+ git add pom.xml **/pom.xml
+ git commit -m "Increased release"
- name: Update development version
+ if: github.event.inputs.rebuild == 'v0.0.0'
run: |
mvn -q versions:set -DnextSnapshot
git add pom.xml **/pom.xml
@@ -32,22 +57,24 @@ jobs:
git push -f
git reset --hard HEAD~1
- name: Prepare release
- id: prep
+ if: github.event.inputs.rebuild == 'v0.0.0'
run: |
mvn -q versions:set -DremoveSnapshot
- export VERSION=$(mvn -q -Dexec.executable=echo -Dexec.args='${project.version}' --non-recursive exec:exec)
git add .
git commit -m "release ${VERSION}"
git tag -f v${VERSION}
git push --tags
- echo ::set-output name=version::${VERSION}
- name: Build with Maven
- run: mvn clean package -Pprod
+ id: build
+ run: |
+ mvn clean package -Pprod ${{ github.event.inputs.extraMavenOptions }}
+ export VERSION=$(mvn -q -Dexec.executable=echo -Dexec.args='${project.version}' --non-recursive exec:exec)
+ echo ::set-output name=version::${VERSION}
- name: Archive JAR
uses: actions/upload-artifact@v2
with:
- name: kafka-ui-${{ steps.prep.outputs.version }}
- path: kafka-ui-api/target/kafka-ui-api-${{ steps.prep.outputs.version }}.jar
+ name: kafka-ui-${{ steps.build.outputs.version }}
+ path: kafka-ui-api/target/kafka-ui-api-${{ steps.build.outputs.version }}.jar
#################
# #
# Docker images #
@@ -70,20 +97,7 @@ jobs:
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- - name: Build
- if: github.ref != 'refs/heads/master'
- id: docker_build
- uses: docker/build-push-action@v2
- with:
- builder: ${{ steps.buildx.outputs.name }}
- context: kafka-ui-api
- push: false
- build-args: |
- JAR_FILE=kafka-ui-api-${{ steps.prep.outputs.version }}.jar
- cache-from: type=local,src=/tmp/.buildx-cache
- cache-to: type=local,dest=/tmp/.buildx-cache
- name: Build and push
- if: github.ref == 'refs/heads/master'
id: docker_build_and_push
uses: docker/build-push-action@v2
with:
@@ -91,10 +105,10 @@ jobs:
context: kafka-ui-api
push: true
tags: |
- provectuslabs/kafka-ui:${{ steps.prep.outputs.version }}
+ provectuslabs/kafka-ui:${{ steps.build.outputs.version }}
provectuslabs/kafka-ui:latest
build-args: |
- JAR_FILE=kafka-ui-api-${{ steps.prep.outputs.version }}.jar
+ JAR_FILE=kafka-ui-api-${{ steps.build.outputs.version }}.jar
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
charts:
@@ -112,7 +126,7 @@ jobs:
run: |
export version=${{needs.release.outputs.version}}
sed -i "s/appVersion:.*/appVersion: ${version}/" charts/kafka-ui/Chart.yaml
- - name:
+ - name: add chart
run: |
export VERSION=${{needs.release.outputs.version}}
MSG=$(helm package --app-version ${VERSION} charts/kafka-ui)
@@ -158,4 +172,4 @@ jobs:
tag_name: "v${{needs.release.outputs.version}}"
prerelease: false
files: kafka-ui-api-${{needs.release.outputs.version}}.jar
- body: ${{steps.generate.outputs.changelog}}
\ No newline at end of file
+ body: ${{steps.generate.outputs.changelog}}
diff --git a/README.md b/README.md
index 99c0c8221e896c745ed1f2792d94d56cf373496f..11f4568b27d4fb47fe6d7ff02e1b54dccc85554b 100644
--- a/README.md
+++ b/README.md
@@ -1,15 +1,15 @@
- Kafka UI – Free Web UI for Kafka
+ UI for Apache Kafka – Free Web UI for Apache Kafka
------------------
-
+
-Kafka UI is a free open-source web UI for monitoring and management of Apache Kafka clusters.
+UI for Apache Kafka is a free open-source web UI for monitoring and management of Apache Kafka clusters.
-Kafka UI is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
+UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
-Set up Kafka UI with just a couple of easy commands to visualize your Kafka data in a comprehensible way. You can run the tool locally or in the cloud.
+Set up UI for Apache Kafka with just a couple of easy commands to visualize your Kafka data in a comprehensible way. You can run the tool locally or in the cloud.
-
+
# Features
@@ -25,10 +25,10 @@ Set up Kafka UI with just a couple of easy commands to visualize your Kafka data
# Getting Started
-To run Kafka UI, you can use a pre-built Docker image or build it locally.
+To run UI for Apache Kafka, you can use a pre-built Docker image or build it locally.
## Running From Docker Image
-The official Docker image for Kafka UI is hosted here: [hub.docker.com/r/provectuslabs/kafka-ui](https://hub.docker.com/r/provectuslabs/kafka-ui).
+The official Docker image for UI for Apache Kafka is hosted here: [hub.docker.com/r/provectuslabs/kafka-ui](https://hub.docker.com/r/provectuslabs/kafka-ui).
Launch Docker container in the background:
```sh
@@ -49,13 +49,13 @@ If you prefer to use `docker-compose` please refer to the [documentation](docker
## Building With Docker
-Steps to build Kafka UI locally with Docker:
+Steps to build UI for Apache Kafka locally with Docker:
1. Install prerequisites: Java and Docker
2. Clone this repository and open a terminal in the directory of the project
-3. Build a Docker container with Kafka UI
-4. Start Kafka UI with your Kafka clusters
-5. Navigate to Kafka UI
+3. Build a Docker container with UI for Apache Kafka
+4. Start UI for Apache Kafka with your Kafka clusters
+5. Navigate to UI for Apache Kafka
### Prerequisites
@@ -76,21 +76,21 @@ Steps to build Kafka UI locally with Docker:
Once you installed the prerequisites and cloned the repository, run the following commands in your project directory:
-Build a Docker container with Kafka UI:
+Build a Docker container with UI for Apache Kafka:
```sh
./mvnw clean install -Pprod
```
-Start Kafka UI with your Kafka clusters:
+Start UI for Apache Kafka with your Kafka clusters:
```sh
docker-compose -f ./docker/kafka-ui.yaml up
```
-To see Kafka UI, navigate to http://localhost:8080.
+To see UI for Apache Kafka, navigate to http://localhost:8080.
If you want to start only kafka-clusters:
```sh
docker-compose -f ./docker/kafka-clusters-only.yaml up
```
-Then start Kafka UI with a **local** profile.
+Then start UI for Apache Kafka with a **local** profile.
## Running Locally Without Docker
@@ -108,11 +108,12 @@ To read more please follow to [chart documentation](charts/kafka-ui/README.md)
# Guides
-To be done
+- [SSO configuration](guides/SSO.md)
+- [AWS IAM configuration](guides/AWS_IAM.md)
## Connecting to a Secure Broker
-Kafka UI supports TLS (SSL) and SASL connections for [encryption and authentication](http://kafka.apache.org/090/documentation.html#security). This can be configured by providing a combination of the following files (placed into the Kafka root directory):
+UI for Apache Kafka supports TLS (SSL) and SASL connections for [encryption and authentication](http://kafka.apache.org/090/documentation.html#security). This can be configured by providing a combination of the following files (placed into the Kafka root directory):
To be continued
@@ -131,6 +132,9 @@ kafka:
bootstrapServers: localhost:29091
zookeeper: localhost:2183
schemaRegistry: http://localhost:8085
+ schemaRegistryAuth:
+ username: username
+ password: password
# schemaNameTemplate: "%s-value"
jmxPort: 9997
-
@@ -140,6 +144,8 @@ kafka:
* `bootstrapServers`: where to connect
* `zookeeper`: zookeeper service address
* `schemaRegistry`: schemaRegistry's address
+* `schemaRegistryAuth.username`: schemaRegistry's basic authentication username
+* `schemaRegistryAuth.password`: schemaRegistry's basic authentication password
* `schemaNameTemplate`: how keys are saved to schemaRegistry
* `jmxPort`: open jmxPosrts of a broker
* `readOnly`: enable read only mode
@@ -153,19 +159,20 @@ For example, if you want to use an environment variable to set the `name` parame
|Name |Description
|-----------------------|-------------------------------
+|`SERVER_SERVLET_CONTEXT_PATH` | URI basePath
|`KAFKA_CLUSTERS_0_NAME` | Cluster name
|`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` |Address where to connect
|`KAFKA_CLUSTERS_0_ZOOKEEPER` | Zookeper service address
|`KAFKA_CLUSTERS_0_KSQLDBSERVER` | KSQL DB server address
|`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` |Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
|`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username
+|`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD` |SchemaRegistry's basic authentication password
|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry
|`KAFKA_CLUSTERS_0_JMXPORT` |Open jmxPosrts of a broker
|`KAFKA_CLUSTERS_0_READONLY` |Enable read only mode. Default: false
+|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME` |Given name for the Kafka Connect cluster
+|`KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS` |Address of the Kafka Connect service endpoint
|`LOGGING_LEVEL_ROOT` | Setting log level (all, debug, info, warn, error, fatal, off). Default: debug
|`LOGGING_LEVEL_COM_PROVECTUS` |Setting log level (all, debug, info, warn, error, fatal, off). Default: debug
-
-
-
-
-
+|`SERVER_PORT` |Port for the embedded server. Default `8080`
diff --git a/charts/kafka-ui/templates/NOTES.txt b/charts/kafka-ui/templates/NOTES.txt
index 65196604d4cf4ea61ae1edcf15eb24cfdea8c49e..94e8d3943441dcdc1250867771d424a405d05b13 100644
--- a/charts/kafka-ui/templates/NOTES.txt
+++ b/charts/kafka-ui/templates/NOTES.txt
@@ -17,5 +17,5 @@
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "kafka-ui.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to use your application"
- kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:80
+ kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:8080
{{- end }}
diff --git a/charts/kafka-ui/templates/deployment.yaml b/charts/kafka-ui/templates/deployment.yaml
index fb9f9883b958c11cc779f32cb186f1399249da06..f8bfe02ae6ba9b5fdd8cc4904fe9a821c59e430e 100644
--- a/charts/kafka-ui/templates/deployment.yaml
+++ b/charts/kafka-ui/templates/deployment.yaml
@@ -52,14 +52,16 @@ spec:
protocol: TCP
livenessProbe:
httpGet:
- path: /
+ {{- $contextPath := .Values.envs.config.SERVER_SERVLET_CONTEXT_PATH | default "" | printf "%s/" | urlParse }}
+ path: {{ get $contextPath "path" }}
port: http
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
httpGet:
- path: /
+ {{- $contextPath := .Values.envs.config.SERVER_SERVLET_CONTEXT_PATH | default "" | printf "%s/" | urlParse }}
+ path: {{ get $contextPath "path" }}
port: http
initialDelaySeconds: 60
periodSeconds: 30
diff --git a/charts/kafka-ui/templates/ingress.yaml b/charts/kafka-ui/templates/ingress.yaml
index a966d32785ec5c357a874bb00256d4b67f3d5ca2..d6e6450dc5734e6c832563527ba77261cb899322 100644
--- a/charts/kafka-ui/templates/ingress.yaml
+++ b/charts/kafka-ui/templates/ingress.yaml
@@ -16,26 +16,34 @@ metadata:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
- {{- if .Values.ingress.tls }}
+ {{- if .Values.ingress.tls.enabled }}
tls:
- {{- range .Values.ingress.tls }}
- hosts:
- {{- range .hosts }}
- - {{ . | quote }}
- {{- end }}
- secretName: {{ .secretName }}
- {{- end }}
+ - {{ .Values.ingress.host }}
+ secretName: {{ .Values.ingress.tls.secretName }}
{{- end }}
rules:
- {{- range .Values.ingress.hosts }}
- - host: {{ .host | quote }}
- http:
+ - http:
paths:
- {{- range .paths }}
- - path: {{ . }}
+ {{- range .Values.ingress.precedingPaths }}
+ - path: {{ .path }}
backend:
+ serviceName: {{ .serviceName }}
+ servicePort: {{ .servicePort }}
+ {{- end }}
+ - backend:
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
+{{- if .Values.ingress.path }}
+ path: {{ .Values.ingress.path }}
+{{- end }}
+ {{- range .Values.ingress.succeedingPaths }}
+ - path: {{ .path }}
+ backend:
+ serviceName: {{ .serviceName }}
+ servicePort: {{ .servicePort }}
{{- end }}
- {{- end }}
- {{- end }}
+{{- if .Values.ingress.host }}
+ host: {{ .Values.ingress.host }}
+{{- end }}
+ {{- end }}
\ No newline at end of file
diff --git a/charts/kafka-ui/values.yaml b/charts/kafka-ui/values.yaml
index 5d20e6d1d37fdca05c5859c14b9ca0dbb624ef63..95b1b7c3838cf08f4454dff56fcff8b00bc5ad72 100644
--- a/charts/kafka-ui/values.yaml
+++ b/charts/kafka-ui/values.yaml
@@ -44,13 +44,32 @@ service:
# if you want to force a specific nodePort. Must be use with service.type=NodePort
# nodePort:
+# Ingress configuration
ingress:
+ # Enable ingress resource
enabled: false
+
+ # Annotations for the Ingress
annotations: {}
- hosts:
- - host: chart-example.local
- paths: []
- tls: []
+
+ # The path for the Ingress
+ path: ""
+
+ # The hostname for the Ingress
+ host: ""
+
+ # configs for Ingress TLS
+ tls:
+ # Enable TLS termination for the Ingress
+ enabled: false
+ # the name of a pre-created Secret containing a TLS private key and certificate
+ secretName: ""
+
+ # HTTP paths to add to the Ingress before the default path
+ precedingPaths: []
+
+ # Http paths to add to the Ingress after the default path
+ succeedingPaths: []
resources: {}
# limits:
diff --git a/docker-compose.md b/docker-compose.md
index 68be207ade4cbcb986e48227e3879353fab78451..56dbd3ca2201591287af6230ef882069b8bbb761 100644
--- a/docker-compose.md
+++ b/docker-compose.md
@@ -19,7 +19,7 @@ services:
- KAFKA_CLUSTERS_0_ZOOKEEPER=localhost:2181
```
-* If you prefer Kafka UI in read only mode
+* If you prefer UI for Apache Kafka in read only mode
```yaml
version: '2'
@@ -37,7 +37,7 @@ services:
- KAFKA_CLUSTERS_0_READONLY=true
```
-* Start Kafka UI process
+* Start UI for Apache Kafka process
```bash
docker-compose up -d kafka-ui
diff --git a/docker/connectors/sink-activities.json b/docker/connectors/sink-activities.json
new file mode 100644
index 0000000000000000000000000000000000000000..fe1e601214887cb1e809cf81280614b1c0ef0e39
--- /dev/null
+++ b/docker/connectors/sink-activities.json
@@ -0,0 +1,19 @@
+{
+ "name": "sink_postgres_activities",
+ "config": {
+ "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
+ "connection.url": "jdbc:postgresql://postgres-db:5432/test",
+ "connection.user": "dev_user",
+ "connection.password": "12345",
+ "topics": "source-activities",
+ "table.name.format": "sink_activities",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "key.converter.schema.registry.url": "http://schemaregistry0:8085",
+ "value.converter": "io.confluent.connect.avro.AvroConverter",
+ "value.converter.schema.registry.url": "http://schemaregistry0:8085",
+ "auto.create": "true",
+ "pk.mode": "record_value",
+ "pk.fields": "id",
+ "insert.mode": "upsert"
+ }
+}
\ No newline at end of file
diff --git a/docker/connectors/source-activities.json b/docker/connectors/source-activities.json
new file mode 100644
index 0000000000000000000000000000000000000000..dc55dfea922538705f13f198a76c5b54a7f9bd72
--- /dev/null
+++ b/docker/connectors/source-activities.json
@@ -0,0 +1,20 @@
+{
+ "name": "source_postgres_activities",
+ "config": {
+ "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
+ "connection.url": "jdbc:postgresql://postgres-db:5432/test",
+ "connection.user": "dev_user",
+ "connection.password": "12345",
+ "topic.prefix": "source-",
+ "poll.interval.ms": 3600000,
+ "table.whitelist": "public.activities",
+ "mode": "bulk",
+ "transforms": "extractkey",
+ "transforms.extractkey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
+ "transforms.extractkey.field": "id",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "key.converter.schema.registry.url": "http://schemaregistry0:8085",
+ "value.converter": "io.confluent.connect.avro.AvroConverter",
+ "value.converter.schema.registry.url": "http://schemaregistry0:8085"
+ }
+}
\ No newline at end of file
diff --git a/docker/connectors/start.sh b/docker/connectors/start.sh
new file mode 100755
index 0000000000000000000000000000000000000000..7adc5e4e1275ae15935c440416cd5651861739c7
--- /dev/null
+++ b/docker/connectors/start.sh
@@ -0,0 +1,9 @@
+#! /bin/bash
+while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' kafka-connect0:8083)" != "200" ]]
+ do sleep 5
+done
+
+echo "\n --------------Creating connectors..."
+for filename in /connectors/*.json; do
+ curl -X POST -H "Content-Type: application/json" -d @$filename http://kafka-connect0:8083/connectors
+done
diff --git a/docker/jaas/client.properties b/docker/jaas/client.properties
new file mode 100644
index 0000000000000000000000000000000000000000..db1feea1f81b1b44c25d905aa98713432b4dfd47
--- /dev/null
+++ b/docker/jaas/client.properties
@@ -0,0 +1,3 @@
+sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
+security.protocol=SASL_PLAINTEXT
+sasl.mechanism=PLAIN
\ No newline at end of file
diff --git a/docker/jaas/kafka_server.conf b/docker/jaas/kafka_server.conf
new file mode 100644
index 0000000000000000000000000000000000000000..ef41c992e21cb68036449d8b5f8a9bdf4985f69d
--- /dev/null
+++ b/docker/jaas/kafka_server.conf
@@ -0,0 +1,14 @@
+KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret"
+ user_enzo="cisternino";
+};
+
+KafkaClient {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ user_admin="admin-secret";
+};
+
+Client {};
\ No newline at end of file
diff --git a/docker/jaas/schema_registry.jaas b/docker/jaas/schema_registry.jaas
new file mode 100644
index 0000000000000000000000000000000000000000..2d50e515df0c43629cec7bb9b6429506799c59ac
--- /dev/null
+++ b/docker/jaas/schema_registry.jaas
@@ -0,0 +1,5 @@
+SchemaRegistryProps {
+ org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required
+ file="/conf/schema_registry.password"
+ debug="false";
+};
diff --git a/docker/jaas/schema_registry.password b/docker/jaas/schema_registry.password
new file mode 100644
index 0000000000000000000000000000000000000000..97d0383bc385159a1eab3828617a0abed2334392
--- /dev/null
+++ b/docker/jaas/schema_registry.password
@@ -0,0 +1 @@
+admin: OBF:1w8t1tvf1w261w8v1w1c1tvn1w8x,admin
\ No newline at end of file
diff --git a/docker/kafka-cluster-sr-auth.yaml b/docker/kafka-cluster-sr-auth.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..9cd22b160a4ae9f049c825430ceaa8275b75d9a0
--- /dev/null
+++ b/docker/kafka-cluster-sr-auth.yaml
@@ -0,0 +1,66 @@
+---
+version: '2'
+services:
+
+ zookeeper1:
+ image: confluentinc/cp-zookeeper:5.2.4
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 2182:2181
+
+ kafka1:
+ image: confluentinc/cp-kafka:5.2.4
+ depends_on:
+ - zookeeper1
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ JMX_PORT: 9998
+ KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.rmi.port=9998
+ ports:
+ - 9093:9093
+ - 9998:9998
+
+ schemaregistry1:
+ image: confluentinc/cp-schema-registry:5.5.0
+ ports:
+ - 18085:8085
+ depends_on:
+ - zookeeper1
+ - kafka1
+ volumes:
+ - ./jaas:/conf
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
+ SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper1:2181
+ SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+ SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
+ SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085
+
+ # Default credentials: admin/letmein
+ SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
+ SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistryProps
+ SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin
+ SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema_registry.jaas
+
+ SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+
+ kafka-init-topics:
+ image: confluentinc/cp-kafka:5.2.4
+ volumes:
+ - ./message.json:/data/message.json
+ depends_on:
+ - kafka1
+ command: "bash -c 'echo Waiting for Kafka to be ready... && \
+ cub kafka-ready -b kafka1:29092 1 30 && \
+ kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+ kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+ kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"
diff --git a/docker/kafka-ui-connectors.yaml b/docker/kafka-ui-connectors.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..e99a01c6da4cf2c6d1e50e7e2d87772e5e6dbd23
--- /dev/null
+++ b/docker/kafka-ui-connectors.yaml
@@ -0,0 +1,173 @@
+---
+version: '2'
+services:
+
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:master
+ ports:
+ - 8080:8080
+ depends_on:
+ - zookeeper0
+ - zookeeper1
+ - kafka0
+ - kafka1
+ - schemaregistry0
+ - kafka-connect0
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
+ KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
+ KAFKA_CLUSTERS_0_JMXPORT: 9997
+ KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
+ KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
+ KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
+ KAFKA_CLUSTERS_1_NAME: secondLocal
+ KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
+ KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
+ KAFKA_CLUSTERS_1_JMXPORT: 9998
+ KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
+ KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
+ KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
+
+ zookeeper0:
+ image: confluentinc/cp-zookeeper:5.2.4
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 2181:2181
+
+ kafka0:
+ image: confluentinc/cp-kafka:5.2.4
+ depends_on:
+ - zookeeper0
+ ports:
+ - 9092:9092
+ - 9997:9997
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper0:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ JMX_PORT: 9997
+ KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
+
+ zookeeper1:
+ image: confluentinc/cp-zookeeper:5.2.4
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+
+ kafka1:
+ image: confluentinc/cp-kafka:5.2.4
+ depends_on:
+ - zookeeper1
+ ports:
+ - 9093:9093
+ - 9998:9998
+ environment:
+ KAFKA_BROKER_ID: 1
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9093
+ KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+ KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ JMX_PORT: 9998
+ KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998
+
+ schemaregistry0:
+ image: confluentinc/cp-schema-registry:5.2.4
+ ports:
+ - 8085:8085
+ depends_on:
+ - zookeeper0
+ - kafka0
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
+ SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper0:2181
+ SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+ SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
+ SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
+
+ SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+
+ schemaregistry1:
+ image: confluentinc/cp-schema-registry:5.5.0
+ ports:
+ - 18085:8085
+ depends_on:
+ - zookeeper1
+ - kafka1
+ environment:
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
+ SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper1:2181
+ SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
+ SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
+ SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085
+
+ SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
+ SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
+ SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
+
+ kafka-connect0:
+ image: confluentinc/cp-kafka-connect:6.0.1
+ ports:
+ - 8083:8083
+ depends_on:
+ - kafka0
+ - schemaregistry0
+ environment:
+ CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
+ CONNECT_GROUP_ID: compose-connect-group
+ CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
+ CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
+ CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_STATUS_STORAGE_TOPIC: _connect_status
+ CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
+ CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+ CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+ CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
+ CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
+ CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
+ CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
+ CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
+
+ kafka-init-topics:
+ image: confluentinc/cp-kafka:5.2.4
+ volumes:
+ - ./message.json:/data/message.json
+ depends_on:
+ - kafka1
+ command: "bash -c 'echo Waiting for Kafka to be ready... && \
+ cub kafka-ready -b kafka1:29092 1 30 && \
+ kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+ kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper1:2181 && \
+ kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --zookeeper zookeeper0:2181 && \
+ kafka-console-producer --broker-list kafka1:29092 -topic second.users < /data/message.json'"
+
+ postgres-db:
+ build:
+ context: ./postgres
+ args:
+ image: postgres:9.6.22
+ ports:
+ - 5432:5432
+ environment:
+ POSTGRES_USER: 'dev_user'
+ POSTGRES_PASSWORD: '12345'
+
+ create-connectors:
+ image: tutum/curl
+ depends_on:
+ - postgres-db
+ - kafka-connect0
+ volumes:
+ - ./connectors:/connectors
+ command: bash -c '/connectors/start.sh'
\ No newline at end of file
diff --git a/docker/kafka-ui-reverse-proxy.yaml b/docker/kafka-ui-reverse-proxy.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..69d94e627a6820bb66be0293964f44ddeba30078
--- /dev/null
+++ b/docker/kafka-ui-reverse-proxy.yaml
@@ -0,0 +1,19 @@
+---
+version: '2'
+services:
+ nginx:
+ image: nginx:latest
+ volumes:
+ - ./proxy.conf:/etc/nginx/conf.d/default.conf
+ ports:
+ - 8080:80
+
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8082:8080
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+ SERVER_SERVLET_CONTEXT_PATH: /kafka-ui
diff --git a/docker/kafka-ui-sasl.yaml b/docker/kafka-ui-sasl.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..1c0312f11a2141de8669e54274d15caa773f8dee
--- /dev/null
+++ b/docker/kafka-ui-sasl.yaml
@@ -0,0 +1,52 @@
+---
+version: '2'
+services:
+
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8080:8080
+ depends_on:
+ - zookeeper
+ - kafka
+ environment:
+ KAFKA_CLUSTERS_0_NAME: local
+# SERVER_SERVLET_CONTEXT_PATH: "/kafkaui"
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+ KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
+ KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
+ KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
+ KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
+ zookeeper:
+ image: confluentinc/cp-zookeeper:5.2.4
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+ ZOOKEEPER_TICK_TIME: 2000
+ ports:
+ - 2181:2181
+
+ kafka:
+ image: wurstmeister/kafka:latest
+ hostname: kafka
+ container_name: kafka
+ depends_on:
+ - zookeeper
+ ports:
+ - '9092:9092'
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
+ KAFKA_LISTENERS: SASL_PLAINTEXT://kafka:9092
+ KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka:9092
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+ ALLOW_PLAINTEXT_LISTENER: 'yes'
+ KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf"
+ KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
+ KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
+ KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
+ KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
+ KAFKA_SECURITY_PROTOCOL: SASL_PLAINTEXT
+ KAFKA_SUPER_USERS: User:admin,User:enzo
+ KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
+ volumes:
+ - ./jaas:/etc/kafka/jaas
\ No newline at end of file
diff --git a/docker/kafka-ui.yaml b/docker/kafka-ui.yaml
index c384a8f4b3c95a074fdbfaede5b4d42a5e4636cd..217337ea5eaae29e1e7ffcbebfe48ca43c545e65 100644
--- a/docker/kafka-ui.yaml
+++ b/docker/kafka-ui.yaml
@@ -26,7 +26,7 @@ services:
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_ZOOKEEPER: zookeeper1:2181
KAFKA_CLUSTERS_1_JMXPORT: 9998
- KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry0:8085
+ KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
KAFKA_CLUSTERS_1_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_1_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
diff --git a/docker/message.json b/docker/message.json
new file mode 100644
index 0000000000000000000000000000000000000000..9e26dfeeb6e641a33dae4961196235bdb965b21b
--- /dev/null
+++ b/docker/message.json
@@ -0,0 +1 @@
+{}
\ No newline at end of file
diff --git a/docker/postgres/Dockerfile b/docker/postgres/Dockerfile
new file mode 100644
index 0000000000000000000000000000000000000000..12db53e1eb658204622b5ed9ce1cbc44f00d582c
--- /dev/null
+++ b/docker/postgres/Dockerfile
@@ -0,0 +1,9 @@
+ARG image
+
+FROM ${image}
+
+MAINTAINER Provectus Team
+
+ADD data.sql /docker-entrypoint-initdb.d
+
+EXPOSE 5432
\ No newline at end of file
diff --git a/docker/postgres/data.sql b/docker/postgres/data.sql
new file mode 100644
index 0000000000000000000000000000000000000000..0e1ffad5baa7f3670d57e9ea5367ce6006fc1485
--- /dev/null
+++ b/docker/postgres/data.sql
@@ -0,0 +1,24 @@
+CREATE DATABASE test WITH OWNER = dev_user;
+\connect test
+
+CREATE TABLE activities
+(
+ id INTEGER PRIMARY KEY,
+ msg varchar(24),
+ action varchar(128),
+ browser varchar(24),
+ device json,
+ createdAt timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
+);
+
+insert into activities(id, action, msg, browser, device)
+values (1, 'LOGIN', 'Success', 'Chrome', '{
+ "name": "Chrome",
+ "major": "67",
+ "version": "67.0.3396.99"
+}'),
+ (2, 'LOGIN', 'Failed', 'Apple WebKit', '{
+ "name": "WebKit",
+ "major": "605",
+ "version": "605.1.15"
+ }');
\ No newline at end of file
diff --git a/docker/proxy.conf b/docker/proxy.conf
new file mode 100644
index 0000000000000000000000000000000000000000..3c72de8330157a653f111ce95558749c8e3f9771
--- /dev/null
+++ b/docker/proxy.conf
@@ -0,0 +1,9 @@
+server {
+ listen 80;
+ server_name localhost;
+
+ location /kafka-ui {
+# rewrite /kafka-ui/(.*) /$1 break;
+ proxy_pass http://kafka-ui:8080;
+ }
+}
diff --git a/guides/AWS_IAM.md b/guides/AWS_IAM.md
new file mode 100644
index 0000000000000000000000000000000000000000..80bfab205bc92312223770851818b778d9e0900c
--- /dev/null
+++ b/guides/AWS_IAM.md
@@ -0,0 +1,41 @@
+# How to configure AWS IAM Authentication
+
+UI for Apache Kafka comes with built-in [aws-msk-iam-auth](https://github.com/aws/aws-msk-iam-auth) library.
+
+You could pass sasl configs in properties section for each cluster.
+
+More details could be found here: [aws-msk-iam-auth](https://github.com/aws/aws-msk-iam-auth)
+
+## Examples:
+
+Please replace
+* with broker list
+* with your aws profile
+
+
+### Running From Docker Image
+
+```sh
+docker run -p 8080:8080 \
+ -e KAFKA_CLUSTERS_0_NAME=local \
+ -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS= \
+ -e KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_SSL \
+ -e KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=AWS_MSK_IAM \
+ -e KAFKA_CLUSTERS_0_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS=software.amazon.msk.auth.iam.IAMClientCallbackHandler \
+ -e KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=""; \
+ -d provectuslabs/kafka-ui:latest
+```
+
+### Configuring by application.yaml
+
+```yaml
+kafka:
+ clusters:
+ - name: local
+ bootstrapServers:
+ properties:
+ security.protocol: SASL_SSL
+ sasl.mechanism: AWS_MSK_IAM
+ sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler
+ sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName="";
+```
\ No newline at end of file
diff --git a/guides/SSO.md b/guides/SSO.md
new file mode 100644
index 0000000000000000000000000000000000000000..2cb2e7fc603b6dd55de1ad45fd53552c706952d3
--- /dev/null
+++ b/guides/SSO.md
@@ -0,0 +1,48 @@
+# How to configure SSO
+SSO require additionaly to configure TLS for application, in that example we will use self-signed certificate, in case of use legal certificates please skip step 1.
+#### Step 1
+At this step we will generate self-signed PKCS12 keypair.
+``` bash
+mkdir cert
+keytool -genkeypair -alias ui-for-apache-kafka -keyalg RSA -keysize 2048 \
+ -storetype PKCS12 -keystore cert/ui-for-apache-kafka.p12 -validity 3650
+```
+#### Step 2
+Create new application in any SSO provider, we will continue with [Auth0](https://auth0.com).
+
+
+
+After that need to provide callback URLs, in our case we will use `https://127.0.0.1:8080/login/oauth2/code/auth0`
+
+
+
+This is a main parameters required for enabling SSO
+
+
+
+#### Step 3
+To launch UI for Apache Kafka with enabled TLS and SSO run following:
+``` bash
+docker run -p 8080:8080 -v `pwd`/cert:/opt/cert -e AUTH_ENABLED=true \
+ -e SECURITY_BASIC_ENABLED=true \
+ -e SERVER_SSL_KEY_STORE_TYPE=PKCS12 \
+ -e SERVER_SSL_KEY_STORE=/opt/cert/ui-for-apache-kafka.p12 \
+ -e SERVER_SSL_KEY_STORE_PASSWORD=123456 \
+ -e SERVER_SSL_KEY_ALIAS=ui-for-apache-kafka \
+ -e SERVER_SSL_ENABLED=true \
+ -e SPRING_SECURITY_OAUTH2_CLIENT_REGISTRATION_AUTH0_CLIENTID=uhvaPKIHU4ZF8Ne4B6PGvF0hWW6OcUSB \
+ -e SPRING_SECURITY_OAUTH2_CLIENT_REGISTRATION_AUTH0_CLIENTSECRET=YXfRjmodifiedTujnkVr7zuW9ECCAK4TcnCio-i \
+ -e SPRING_SECURITY_OAUTH2_CLIENT_PROVIDER_AUTH0_ISSUER_URI=https://dev-a63ggcut.auth0.com/ \
+ -e TRUST_STORE=/opt/cert/ui-for-apache-kafka.p12 \
+ -e TRUST_STORE_PASSWORD=123456 \
+provectuslabs/kafka-ui:0.1.0
+```
+In the case with trusted CA-signed SSL certificate and SSL termination somewhere outside of application we can pass only SSO related environment variables:
+``` bash
+docker run -p 8080:8080 -v `pwd`/cert:/opt/cert -e AUTH_ENABLED=true \
+ -e SECURITY_BASIC_ENABLED=true \
+ -e SPRING_SECURITY_OAUTH2_CLIENT_REGISTRATION_AUTH0_CLIENTID=uhvaPKIHU4ZF8Ne4B6PGvF0hWW6OcUSB \
+ -e SPRING_SECURITY_OAUTH2_CLIENT_REGISTRATION_AUTH0_CLIENTSECRET=YXfRjmodifiedTujnkVr7zuW9ECCAK4TcnCio-i \
+ -e SPRING_SECURITY_OAUTH2_CLIENT_PROVIDER_AUTH0_ISSUER_URI=https://dev-a63ggcut.auth0.com/ \
+provectuslabs/kafka-ui:0.1.0
+```
diff --git a/images/apache-kafka-ui-interface-dashboard.png b/images/apache-kafka-ui-interface-dashboard.png
new file mode 100644
index 0000000000000000000000000000000000000000..b11f5c9059d15e4b86f91c55610ead710c8b8f0c
Binary files /dev/null and b/images/apache-kafka-ui-interface-dashboard.png differ
diff --git a/images/kafka-ui-interface-dashboard.png b/images/kafka-ui-interface-dashboard.png
deleted file mode 100644
index ec86fa2ea5bc19653b73398b4d85bce9ada211f2..0000000000000000000000000000000000000000
Binary files a/images/kafka-ui-interface-dashboard.png and /dev/null differ
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index bd1447b1ca7a32266b1c5decbbef0a3ac92f1ba2..ea7a888173d526067f9f5359a803cd16fb96be51 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -4,7 +4,7 @@
kafka-ui
com.provectus
- 0.0.11-SNAPSHOT
+ 0.1.1-SNAPSHOT
4.0.0
@@ -86,12 +86,23 @@
kafka-avro-serializer
${confluent.version}
+
+ io.confluent
+ kafka-json-schema-serializer
+ ${confluent.version}
+
io.confluent
kafka-protobuf-serializer
${confluent.version}
+
+ software.amazon.msk
+ aws-msk-iam-auth
+ 1.1.0
+
+
org.apache.avro
avro
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
index 2a1883287bc9f46deb6751eb05066d92f7e11f58..63925376cabc25d8e6f1ee3ad1405c58d7eb1dac 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
@@ -20,8 +20,10 @@ public class ClustersProperties {
String bootstrapServers;
String zookeeper;
String schemaRegistry;
+ SchemaRegistryAuth schemaRegistryAuth;
String ksqldbServer;
String schemaNameTemplate = "%s-value";
+ String keySchemaNameTemplate = "%s-key";
String protobufFile;
String protobufMessageName;
List kafkaConnect;
@@ -35,4 +37,10 @@ public class ClustersProperties {
String name;
String address;
}
+
+ @Data
+ public static class SchemaRegistryAuth {
+ String username;
+ String password;
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
index de2f37754210d8357f70e7693293bc4a8921a75b..11b987fa6d97ba8f8711665bc59deffece57cb6f 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/Config.java
@@ -22,7 +22,7 @@ public class Config {
}
private GenericKeyedObjectPoolConfig poolConfig() {
- GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
+ final var poolConfig = new GenericKeyedObjectPoolConfig();
poolConfig.setMaxIdlePerKey(3);
poolConfig.setMaxTotalPerKey(3);
return poolConfig;
@@ -30,7 +30,7 @@ public class Config {
@Bean
public MBeanExporter exporter() {
- final MBeanExporter exporter = new MBeanExporter();
+ final var exporter = new MBeanExporter();
exporter.setAutodetect(true);
exporter.setExcludedBeans("pool");
return exporter;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java
index 8ad7f66db53bb1f498ff2b323a6b3871730cc8a0..6dce3b5e012c43422ad32f6692d15e9904a76fbb 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/CustomWebFilter.java
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.config;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
@@ -7,15 +8,32 @@ import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Component
+
public class CustomWebFilter implements WebFilter {
+
+ private final ServerProperties serverProperties;
+
+ public CustomWebFilter(ServerProperties serverProperties) {
+ this.serverProperties = serverProperties;
+ }
+
@Override
public Mono filter(ServerWebExchange exchange, WebFilterChain chain) {
- if (exchange.getRequest().getURI().getPath().equals("/")
- || exchange.getRequest().getURI().getPath().startsWith("/ui")) {
+ String contextPath = serverProperties.getServlet().getContextPath() != null
+ ? serverProperties.getServlet().getContextPath() : "";
+
+ final String path = exchange.getRequest().getURI().getPath().replaceAll("/$", "");
+ if (path.equals(contextPath) || path.startsWith(contextPath + "/ui")) {
return chain.filter(
exchange.mutate().request(exchange.getRequest().mutate().path("/index.html").build())
- .build());
- }
+ .build()
+ );
+ } else if (path.startsWith(contextPath)) {
+ return chain.filter(
+ exchange.mutate().request(exchange.getRequest().mutate().contextPath(contextPath).build())
+ .build()
+ );
+ }
return chain.filter(exchange);
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java
index fea8f24c887cfb100745d8aae30722d3db45cb54..cb3b3f2674b0f61a7cd32982fe851f22eb77c5e0 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java
@@ -39,4 +39,10 @@ public class ClustersController implements ClustersApi {
public Mono>> getClusters(ServerWebExchange exchange) {
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getClusters())));
}
+
+ @Override
+ public Mono> updateClusterInfo(String clusterName,
+ ServerWebExchange exchange) {
+ return clusterService.updateCluster(clusterName).map(ResponseEntity::ok);
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java
index ba52586ee095838690ef4e2e33ed58afb36b9fe0..b0bac49169a65dab4407b4731f8d8d5ce6f38a56 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java
@@ -1,13 +1,23 @@
package com.provectus.kafka.ui.controller;
+import static java.util.stream.Collectors.toMap;
+
import com.provectus.kafka.ui.api.ConsumerGroupsApi;
+import com.provectus.kafka.ui.exception.ClusterNotFoundException;
+import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.ConsumerGroup;
import com.provectus.kafka.ui.model.ConsumerGroupDetails;
-import com.provectus.kafka.ui.model.TopicConsumerGroups;
+import com.provectus.kafka.ui.model.ConsumerGroupOffsetsReset;
+import com.provectus.kafka.ui.model.PartitionOffset;
import com.provectus.kafka.ui.service.ClusterService;
+import com.provectus.kafka.ui.service.ClustersStorage;
+import com.provectus.kafka.ui.service.OffsetsResetService;
+import java.util.Map;
+import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
+import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
@@ -18,6 +28,15 @@ import reactor.core.publisher.Mono;
@Log4j2
public class ConsumerGroupsController implements ConsumerGroupsApi {
private final ClusterService clusterService;
+ private final OffsetsResetService offsetsResetService;
+ private final ClustersStorage clustersStorage;
+
+ @Override
+ public Mono> deleteConsumerGroup(String clusterName, String id,
+ ServerWebExchange exchange) {
+ return clusterService.deleteConsumerGroupById(clusterName, id)
+ .map(ResponseEntity::ok);
+ }
@Override
public Mono> getConsumerGroup(
@@ -37,9 +56,56 @@ public class ConsumerGroupsController implements ConsumerGroupsApi {
}
@Override
- public Mono> getTopicConsumerGroups(
+ public Mono>> getTopicConsumerGroups(
String clusterName, String topicName, ServerWebExchange exchange) {
- return clusterService.getTopicConsumerGroupDetail(clusterName, topicName)
- .map(ResponseEntity::ok);
+ return clusterService.getConsumerGroups(clusterName, Optional.of(topicName))
+ .map(Flux::fromIterable)
+ .map(ResponseEntity::ok)
+ .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
}
+
+
+ @Override
+ public Mono> resetConsumerGroupOffsets(String clusterName, String group,
+ Mono
+ consumerGroupOffsetsReset,
+ ServerWebExchange exchange) {
+ return consumerGroupOffsetsReset.map(reset -> {
+ var cluster =
+ clustersStorage.getClusterByName(clusterName).orElseThrow(ClusterNotFoundException::new);
+
+ switch (reset.getResetType()) {
+ case EARLIEST:
+ offsetsResetService
+ .resetToEarliest(cluster, group, reset.getTopic(), reset.getPartitions());
+ break;
+ case LATEST:
+ offsetsResetService
+ .resetToLatest(cluster, group, reset.getTopic(), reset.getPartitions());
+ break;
+ case TIMESTAMP:
+ if (reset.getResetToTimestamp() == null) {
+ throw new ValidationException(
+ "resetToTimestamp is required when TIMESTAMP reset type used");
+ }
+ offsetsResetService
+ .resetToTimestamp(cluster, group, reset.getTopic(), reset.getPartitions(),
+ reset.getResetToTimestamp());
+ break;
+ case OFFSET:
+ if (CollectionUtils.isEmpty(reset.getPartitionsOffsets())) {
+ throw new ValidationException(
+ "partitionsOffsets is required when OFFSET reset type used");
+ }
+ Map offsets = reset.getPartitionsOffsets().stream()
+ .collect(toMap(PartitionOffset::getPartition, PartitionOffset::getOffset));
+ offsetsResetService.resetToOffsets(cluster, group, reset.getTopic(), offsets);
+ break;
+ default:
+ throw new ValidationException("Unknown resetType " + reset.getResetType());
+ }
+ return ResponseEntity.ok().build();
+ });
+ }
+
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
index 5dcb94a98922db2c3e5e2a668935dd9d5166f621..830168559cf14e41548c9f951f7f30837e5a1624 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
@@ -2,8 +2,11 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.MessagesApi;
import com.provectus.kafka.ui.model.ConsumerPosition;
+import com.provectus.kafka.ui.model.CreateTopicMessage;
+import com.provectus.kafka.ui.model.SeekDirection;
import com.provectus.kafka.ui.model.SeekType;
import com.provectus.kafka.ui.model.TopicMessage;
+import com.provectus.kafka.ui.model.TopicMessageSchema;
import com.provectus.kafka.ui.service.ClusterService;
import java.util.Collections;
import java.util.List;
@@ -13,6 +16,7 @@ import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
@@ -40,28 +44,50 @@ public class MessagesController implements MessagesApi {
@Override
public Mono>> getTopicMessages(
String clusterName, String topicName, @Valid SeekType seekType, @Valid List seekTo,
- @Valid Integer limit, @Valid String q, ServerWebExchange exchange) {
- return parseConsumerPosition(seekType, seekTo)
+ @Valid Integer limit, @Valid String q, @Valid SeekDirection seekDirection,
+ ServerWebExchange exchange) {
+ return parseConsumerPosition(topicName, seekType, seekTo, seekDirection)
.map(consumerPosition -> ResponseEntity
.ok(clusterService.getMessages(clusterName, topicName, consumerPosition, q, limit)));
}
- private Mono parseConsumerPosition(SeekType seekType, List seekTo) {
+ @Override
+ public Mono> getTopicSchema(
+ String clusterName, String topicName, ServerWebExchange exchange) {
+ return Mono.just(clusterService.getTopicSchema(clusterName, topicName))
+ .map(ResponseEntity::ok);
+ }
+
+ @Override
+ public Mono> sendTopicMessages(
+ String clusterName, String topicName, @Valid Mono createTopicMessage,
+ ServerWebExchange exchange) {
+ return createTopicMessage.flatMap(msg ->
+ clusterService.sendMessage(clusterName, topicName, msg)
+ ).map(ResponseEntity::ok);
+ }
+
+
+ private Mono parseConsumerPosition(
+ String topicName, SeekType seekType, List seekTo, SeekDirection seekDirection) {
return Mono.justOrEmpty(seekTo)
.defaultIfEmpty(Collections.emptyList())
.flatMapIterable(Function.identity())
.map(p -> {
- String[] splited = p.split("::");
- if (splited.length != 2) {
+ String[] split = p.split("::");
+ if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}
- return Pair.of(Integer.parseInt(splited[0]), Long.parseLong(splited[1]));
+ return Pair.of(
+ new TopicPartition(topicName, Integer.parseInt(split[0])),
+ Long.parseLong(split[1])
+ );
})
.collectMap(Pair::getKey, Pair::getValue)
.map(positions -> new ConsumerPosition(seekType != null ? seekType : SeekType.BEGINNING,
- positions));
+ positions, seekDirection));
}
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java
new file mode 100644
index 0000000000000000000000000000000000000000..2b48d53a7a97e5eb5db689e734bd1d276bede59a
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/StaticController.java
@@ -0,0 +1,55 @@
+package com.provectus.kafka.ui.controller;
+
+import com.provectus.kafka.ui.util.ResourceUtil;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.log4j.Log4j2;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.core.io.Resource;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequiredArgsConstructor
+@Log4j2
+public class StaticController {
+ private final ServerProperties serverProperties;
+
+ @Value("classpath:static/index.html")
+ private Resource indexFile;
+ private final AtomicReference renderedIndexFile = new AtomicReference<>();
+
+ @GetMapping(value = "/index.html", produces = { "text/html" })
+ public Mono> getIndex() {
+ return Mono.just(ResponseEntity.ok(getRenderedIndexFile()));
+ }
+
+ public String getRenderedIndexFile() {
+ String rendered = renderedIndexFile.get();
+ if (rendered == null) {
+ rendered = buildIndexFile();
+ if (renderedIndexFile.compareAndSet(null, rendered)) {
+ return rendered;
+ } else {
+ return renderedIndexFile.get();
+ }
+ } else {
+ return rendered;
+ }
+ }
+
+ @SneakyThrows
+ private String buildIndexFile() {
+ final String contextPath = serverProperties.getServlet().getContextPath() != null
+ ? serverProperties.getServlet().getContextPath() : "";
+ final String staticPath = contextPath + "/static";
+ return ResourceUtil.readAsString(indexFile)
+ .replace("href=\"./static", "href=\"" + staticPath)
+ .replace("src=\"./static", "src=\"" + staticPath)
+ .replace("window.basePath=\"\"", "window.basePath=\"" + contextPath + "\"");
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
index b7e934e7b85a33f466ed1602f6d8cb0143d92c21..707e2fac41a15af9b87442f06c977c88f29cddb5 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
@@ -1,6 +1,10 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.TopicsApi;
+import com.provectus.kafka.ui.model.PartitionsIncrease;
+import com.provectus.kafka.ui.model.PartitionsIncreaseResponse;
+import com.provectus.kafka.ui.model.ReplicationFactorChange;
+import com.provectus.kafka.ui.model.ReplicationFactorChangeResponse;
import com.provectus.kafka.ui.model.Topic;
import com.provectus.kafka.ui.model.TopicColumnsToSort;
import com.provectus.kafka.ui.model.TopicConfig;
@@ -86,4 +90,23 @@ public class TopicsController implements TopicsApi {
ServerWebExchange exchange) {
return clusterService.updateTopic(clusterId, topicName, topicUpdate).map(ResponseEntity::ok);
}
+
+ @Override
+ public Mono> increaseTopicPartitions(
+ String clusterName, String topicName,
+ Mono partitionsIncrease,
+ ServerWebExchange exchange) {
+ return partitionsIncrease.flatMap(
+ partitions -> clusterService.increaseTopicPartitions(clusterName, topicName, partitions))
+ .map(ResponseEntity::ok);
+ }
+
+ @Override
+ public Mono> changeReplicationFactor(
+ String clusterName, String topicName, Mono replicationFactorChange,
+ ServerWebExchange exchange) {
+ return replicationFactorChange
+ .flatMap(rfc -> clusterService.changeReplicationFactor(clusterName, topicName, rfc))
+ .map(ResponseEntity::ok);
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/deserialization/ProtobufFileRecordDeserializer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/deserialization/ProtobufFileRecordDeserializer.java
deleted file mode 100644
index a2addd676e93165b73207c069edf411cced8ca95..0000000000000000000000000000000000000000
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/deserialization/ProtobufFileRecordDeserializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.provectus.kafka.ui.deserialization;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.DynamicMessage;
-import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
-import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaUtils;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.utils.Bytes;
-
-public class ProtobufFileRecordDeserializer implements RecordDeserializer {
- private final ProtobufSchema protobufSchema;
- private final ObjectMapper objectMapper;
-
- public ProtobufFileRecordDeserializer(Path protobufSchemaPath, String messageName,
- ObjectMapper objectMapper) throws IOException {
- this.objectMapper = objectMapper;
- final String schemaString = Files.lines(protobufSchemaPath).collect(Collectors.joining());
- this.protobufSchema = new ProtobufSchema(schemaString).copy(messageName);
- }
-
- @Override
- public Object deserialize(ConsumerRecord record) {
- try {
- final DynamicMessage message = DynamicMessage.parseFrom(
- protobufSchema.toDescriptor(),
- new ByteArrayInputStream(record.value().get())
- );
- byte[] bytes = ProtobufSchemaUtils.toJson(message);
- return parseJson(bytes);
- } catch (Throwable e) {
- throw new RuntimeException("Failed to parse record from topic " + record.topic(), e);
- }
- }
-
- private Object parseJson(byte[] bytes) throws IOException {
- return objectMapper.readValue(bytes, new TypeReference