Merge branch 'master' into dependabot/npm_and_yarn/kafka-ui-react-app/typescript-5.0.4

This commit is contained in:
Roman Zabaluev 2023-05-11 14:31:12 +04:00 committed by GitHub
commit a669476968
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
301 changed files with 9320 additions and 5965 deletions

92
.github/ISSUE_TEMPLATE/bug.yml vendored Normal file
View file

@ -0,0 +1,92 @@
name: "\U0001F41E Bug report"
description: File a bug report
labels: ["status/triage", "type/bug"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version, if you use one
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,64 +0,0 @@
---
name: "\U0001F41E Bug report"
about: Create a bug report
title: ''
labels: status/triage, type/bug
assignees: ''
---
<!--
We will close the issue without further explanation if you don't follow this template and don't provide the information requested within this template.
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
<!--
Please follow the naming conventions for bugs:
<Feature/Area/Scope> : <Compact, but specific problem summary>
Avoid generic titles, like “Topics: incorrect layout of message sorting drop-down list”. Better use something like: “Topics: Message sorting drop-down list overlaps the "Submit" button”.
-->
**Describe the bug** (Actual behavior)
<!--(A clear and concise description of what the bug is.Use a list, if there is more than one problem)-->
**Expected behavior**
<!--(A clear and concise description of what you expected to happen.)-->
**Set up**
<!--
WE MIGHT CLOSE THE ISSUE without further explanation IF YOU DON'T PROVIDE THIS INFORMATION.
How do you run the app? Please provide as much info as possible:
1. App version (docker image version or check commit hash in the top left corner in UI)
2. Helm chart version, if you use one
3. Any IAAC configs
-->
**Steps to Reproduce**
<!-- We'd like you to provide an example setup (via docker-compose, helm, etc.)
to reproduce the problem, especially with a complex setups. -->
1.
**Screenshots**
<!--
(If applicable, add screenshots to help explain your problem)
-->
**Additional context**
<!--
Add any other context about the problem here. E.g.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successfull or same issue occured? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
-->

11
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View file

@ -0,0 +1,11 @@
blank_issues_enabled: false
contact_links:
- name: Official documentation
url: https://docs.kafka-ui.provectus.io/
about: Before reaching out for support, please refer to our documentation. Read "FAQ" and "Common problems", also try using search there.
- name: Community Discord
url: https://discord.gg/4DWzD7pGE5
about: Chat with other users, get some support or ask questions.
- name: GitHub Discussions
url: https://github.com/provectus/kafka-ui/discussions
about: An alternative place to ask questions or to get some support.

66
.github/ISSUE_TEMPLATE/feature.yml vendored Normal file
View file

@ -0,0 +1,66 @@
name: "\U0001F680 Feature request"
description: Propose a new feature
labels: ["status/triage", "type/feature"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md) and the feature is not present there
required: true
- type: textarea
attributes:
label: Is your proposal related to a problem?
description: |
Provide a clear and concise description of what the problem is.
For example, "I'm always frustrated when..."
validations:
required: false
- type: textarea
attributes:
label: Describe the feature you're interested in
description: |
Provide a clear and concise description of what you want to happen.
validations:
required: true
- type: textarea
attributes:
label: Describe alternatives you've considered
description: |
Let us know about other solutions you've tried or researched.
validations:
required: false
- type: input
attributes:
label: Version you're running
description: |
Please provide the app version you're currently running:
1. App version (commit hash in the top left corner of the UI)
validations:
required: true
- type: textarea
attributes:
label: Additional context
description: |
Is there anything else you can add about the proposal?
You might want to link to related issues here, if you haven't already.
validations:
required: false

View file

@ -1,46 +0,0 @@
---
name: "\U0001F680 Feature request"
about: Propose a new feature
title: ''
labels: status/triage, type/feature
assignees: ''
---
<!--
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
### Which version of the app are you running?
<!-- Please provide docker image version or check commit hash in the top left corner in UI) -->
### Is your proposal related to a problem?
<!--
Provide a clear and concise description of what the problem is.
For example, "I'm always frustrated when..."
-->
### Describe the solution you'd like
<!--
Provide a clear and concise description of what you want to happen.
-->
### Describe alternatives you've considered
<!--
Let us know about other solutions you've tried or researched.
-->
### Additional context
<!--
Is there anything else you can add about the proposal?
You might want to link to related issues here, if you haven't already.
-->

92
.github/ISSUE_TEMPLATE/helm.yml vendored Normal file
View file

@ -0,0 +1,92 @@
name: "⎈ K8s/Helm problem report"
description: "Report a problem with k8s/helm charts/etc"
labels: ["status/triage", "scope/k8s"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,52 +0,0 @@
---
name: "⎈ K8s/Helm problem report"
about: Report a problem with k8s/helm charts/etc
title: ''
labels: scope/k8s, status/triage
assignees: azatsafin
---
<!--
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
**Describe the bug**
<!--(A clear and concise description of what the bug is.)-->
**Set up**
<!--
How do you run the app? Please provide as much info as possible:
1. App version (docker image version or check commit hash in the top left corner in UI)
2. Helm chart version, if you use one
3. Any IAAC configs
We might close the issue without further explanation if you don't provide such information.
-->
**Steps to Reproduce**
Steps to reproduce the behavior:
1.
**Expected behavior**
<!--
(A clear and concise description of what you expected to happen)
-->
**Screenshots**
<!--
(If applicable, add screenshots to help explain your problem)
-->
**Additional context**
<!--
(Add any other context about the problem here)
-->

View file

@ -1,16 +0,0 @@
---
name: "❓ Question"
about: Ask a question
title: ''
---
<!--
To ask a question, please either:
1. Open up a discussion (https://github.com/provectus/kafka-ui/discussions)
2. Join us on discord (https://discord.gg/4DWzD7pGE5) and ask there.
Don't forget to check/search for existing issues/discussions.
-->

View file

@ -8,8 +8,6 @@ updates:
timezone: Europe/Moscow
reviewers:
- "Haarolean"
assignees:
- "Haarolean"
labels:
- "scope/backend"
- "type/dependencies"
@ -99,8 +97,6 @@ updates:
timezone: Europe/Moscow
reviewers:
- "Haarolean"
assignees:
- "Haarolean"
labels:
- "scope/infrastructure"
- "type/dependencies"

View file

@ -16,18 +16,26 @@ exclude-labels:
- 'type/refactoring'
categories:
- title: '🚩 Breaking Changes'
labels:
- 'impact/changelog'
- title: '⚙Features'
labels:
- 'type/feature'
- title: '🪛Enhancements'
labels:
- 'type/enhancement'
- title: '🔨Bug Fixes'
labels:
- 'type/bug'
- title: 'Security'
labels:
- 'type/security'
- title: '⎈ Helm/K8S Changes'
labels:
- 'scope/k8s'

View file

@ -6,7 +6,7 @@ jobs:
block_merge:
runs-on: ubuntu-latest
steps:
- uses: mheap/github-action-required-labels@v3
- uses: mheap/github-action-required-labels@v4
with:
mode: exactly
count: 0

View file

@ -86,7 +86,7 @@ jobs:
- name: make comment with private deployment link
if: ${{ github.event.label.name == 'status/feature_testing' }}
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
@ -94,7 +94,7 @@ jobs:
- name: make comment with public deployment link
if: ${{ github.event.label.name == 'status/feature_testing_public' }}
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -21,7 +21,7 @@ jobs:
git add ../kafka-ui-from-branch/
git commit -m "removed env:${{ needs.build.outputs.deploy }}" && git push || true
- name: make comment with deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -65,7 +65,7 @@ jobs:
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
- name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -55,7 +55,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache
- name: Run CVE checks
uses: aquasecurity/trivy-action@0.9.2
uses: aquasecurity/trivy-action@0.10.0
with:
image-ref: "provectuslabs/kafka-ui:${{ steps.build.outputs.version }}"
format: "table"

View file

@ -33,7 +33,7 @@ jobs:
--image-ids imageTag=${{ steps.extract_branch.outputs.tag }} \
--region us-east-1
- name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -2,18 +2,33 @@ name: Release Drafter
on:
push:
# branches to consider in the event; optional, defaults to all
branches:
- master
workflow_dispatch:
inputs:
version:
description: 'Release version'
required: false
branch:
description: 'Target branch'
required: false
default: 'master'
permissions:
contents: read
jobs:
update_release_draft:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- uses: release-drafter/release-drafter@v5
with:
config-name: release_drafter.yaml
disable-autolabeler: true
version: ${{ github.event.inputs.version }}
commitish: ${{ github.event.inputs.branch }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View file

@ -7,7 +7,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v7
- uses: actions/stale@v8
with:
days-before-issue-stale: 7
days-before-issue-close: 3

View file

@ -6,8 +6,9 @@ Following versions of the project are currently being supported with security up
| Version | Supported |
| ------- | ------------------ |
| 0.5.x | :white_check_mark: |
| 0.4.x | :x: |
| 0.6.x | :white_check_mark: |
| 0.5.x | :x: |
| 0.4.x | :x: |
| 0.3.x | :x: |
| 0.2.x | :x: |
| 0.1.x | :x: |

View file

@ -2,6 +2,6 @@ apiVersion: v2
name: kafka-ui
description: A Helm chart for kafka-UI
type: application
version: 0.6.1
appVersion: v0.6.1
version: 0.6.2
appVersion: v0.6.2
icon: https://github.com/provectus/kafka-ui/raw/master/documentation/images/kafka-ui-logo.png

View file

@ -11,4 +11,8 @@ KafkaClient {
user_admin="admin-secret";
};
Client {};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zkuser"
password="zkuserpassword";
};

View file

@ -0,0 +1,4 @@
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_zkuser="zkuserpassword";
};

View file

@ -0,0 +1,59 @@
---
version: '2'
services:
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- zookeeper
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
zookeeper:
image: wurstmeister/zookeeper:3.4.6
environment:
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
volumes:
- ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf"
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: localhost
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN'
KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT'
KAFKA_SUPER_USERS: 'User:admin'
volumes:
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./jaas:/etc/kafka/jaas

View file

@ -9,4 +9,6 @@ message MySpecificTopicValue {
message MyValue {
int32 version = 1;
string payload = 2;
map<int32, string> intToStringMap = 3;
map<string, MyValue> strToObjMap = 4;
}

View file

@ -0,0 +1,333 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">
<!--
Checkstyle configuration that checks the Google coding conventions from Google Java Style
that can be found at https://google.github.io/styleguide/javaguide.html
Checkstyle is very configurable. Be sure to read the documentation at
http://checkstyle.org (or in your downloaded distribution).
To completely disable a check, just comment it out or delete it from the file.
To suppress certain violations please review suppression filters.
Authors: Max Vetrenko, Ruslan Diachenko, Roman Ivanov.
-->
<module name = "Checker">
<property name="charset" value="UTF-8"/>
<property name="severity" value="warning"/>
<property name="fileExtensions" value="java, properties, xml"/>
<!-- Excludes all 'module-info.java' files -->
<!-- See https://checkstyle.org/config_filefilters.html -->
<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern" value="module\-info\.java$"/>
</module>
<!-- https://checkstyle.org/config_filters.html#SuppressionFilter -->
<module name="SuppressionFilter">
<property name="file" value="${org.checkstyle.google.suppressionfilter.config}"
default="checkstyle-suppressions.xml" />
<property name="optional" value="true"/>
</module>
<!-- Checks for whitespace -->
<!-- See http://checkstyle.org/config_whitespace.html -->
<module name="FileTabCharacter">
<property name="eachLine" value="true"/>
</module>
<module name="LineLength">
<property name="fileExtensions" value="java"/>
<property name="max" value="120"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="TreeWalker">
<module name="OuterTypeFilename"/>
<module name="IllegalTokenText">
<property name="tokens" value="STRING_LITERAL, CHAR_LITERAL"/>
<property name="format"
value="\\u00(09|0(a|A)|0(c|C)|0(d|D)|22|27|5(C|c))|\\(0(10|11|12|14|15|42|47)|134)"/>
<property name="message"
value="Consider using special escape sequence instead of octal value or Unicode escaped value."/>
</module>
<module name="AvoidEscapedUnicodeCharacters">
<property name="allowEscapesForControlCharacters" value="true"/>
<property name="allowByTailComment" value="true"/>
<property name="allowNonPrintableEscapes" value="true"/>
</module>
<module name="AvoidStarImport"/>
<module name="OneTopLevelClass"/>
<module name="NoLineWrap">
<property name="tokens" value="PACKAGE_DEF, IMPORT, STATIC_IMPORT"/>
</module>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
<property name="tokens"
value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
</module>
<module name="NeedBraces">
<property name="tokens"
value="LITERAL_DO, LITERAL_ELSE, LITERAL_FOR, LITERAL_IF, LITERAL_WHILE"/>
</module>
<module name="LeftCurly">
<property name="tokens"
value="ANNOTATION_DEF, CLASS_DEF, CTOR_DEF, ENUM_CONSTANT_DEF, ENUM_DEF,
INTERFACE_DEF, LAMBDA, LITERAL_CASE, LITERAL_CATCH, LITERAL_DEFAULT,
LITERAL_DO, LITERAL_ELSE, LITERAL_FINALLY, LITERAL_FOR, LITERAL_IF,
LITERAL_SWITCH, LITERAL_SYNCHRONIZED, LITERAL_TRY, LITERAL_WHILE, METHOD_DEF,
OBJBLOCK, STATIC_INIT"/>
</module>
<module name="RightCurly">
<property name="id" value="RightCurlySame"/>
<property name="tokens"
value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE,
LITERAL_DO"/>
</module>
<module name="RightCurly">
<property name="id" value="RightCurlyAlone"/>
<property name="option" value="alone"/>
<property name="tokens"
value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT,
INSTANCE_INIT, ANNOTATION_DEF, ENUM_DEF"/>
</module>
<module name="SuppressionXpathSingleFilter">
<!-- suppresion is required till https://github.com/checkstyle/checkstyle/issues/7541 -->
<property name="id" value="RightCurlyAlone"/>
<property name="query" value="//RCURLY[parent::SLIST[count(./*)=1]
or preceding-sibling::*[last()][self::LCURLY]]"/>
</module>
<module name="WhitespaceAfter">
<property name="tokens"
value="COMMA, SEMI, TYPECAST, LITERAL_IF, LITERAL_ELSE,
LITERAL_WHILE, LITERAL_DO, LITERAL_FOR, DO_WHILE"/>
</module>
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyLambdas" value="true"/>
<property name="allowEmptyMethods" value="true"/>
<property name="allowEmptyTypes" value="true"/>
<property name="allowEmptyLoops" value="true"/>
<property name="tokens"
value="ASSIGN, BAND, BAND_ASSIGN, BOR, BOR_ASSIGN, BSR, BSR_ASSIGN, BXOR,
BXOR_ASSIGN, COLON, DIV, DIV_ASSIGN, DO_WHILE, EQUAL, GE, GT, LAMBDA, LAND,
LCURLY, LE, LITERAL_CATCH, LITERAL_DO, LITERAL_ELSE, LITERAL_FINALLY,
LITERAL_FOR, LITERAL_IF, LITERAL_RETURN, LITERAL_SWITCH, LITERAL_SYNCHRONIZED,
LITERAL_TRY, LITERAL_WHILE, LOR, LT, MINUS, MINUS_ASSIGN, MOD, MOD_ASSIGN,
NOT_EQUAL, PLUS, PLUS_ASSIGN, QUESTION, RCURLY, SL, SLIST, SL_ASSIGN, SR,
SR_ASSIGN, STAR, STAR_ASSIGN, LITERAL_ASSERT, TYPE_EXTENSION_AND"/>
<message key="ws.notFollowed"
value="WhitespaceAround: ''{0}'' is not followed by whitespace. Empty blocks may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/>
<message key="ws.notPreceded"
value="WhitespaceAround: ''{0}'' is not preceded with whitespace."/>
</module>
<module name="OneStatementPerLine"/>
<!-- <module name="MultipleVariableDeclarations"/>-->
<module name="ArrayTypeStyle"/>
<module name="MissingSwitchDefault"/>
<module name="FallThrough"/>
<module name="UpperEll"/>
<module name="ModifierOrder"/>
<module name="EmptyLineSeparator">
<property name="tokens"
value="PACKAGE_DEF, IMPORT, STATIC_IMPORT, CLASS_DEF, INTERFACE_DEF, ENUM_DEF,
STATIC_INIT, INSTANCE_INIT, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
<property name="allowNoEmptyLineBetweenFields" value="true"/>
</module>
<module name="SeparatorWrap">
<property name="id" value="SeparatorWrapDot"/>
<property name="tokens" value="DOT"/>
<property name="option" value="nl"/>
</module>
<module name="SeparatorWrap">
<property name="id" value="SeparatorWrapComma"/>
<property name="tokens" value="COMMA"/>
<property name="option" value="EOL"/>
</module>
<module name="SeparatorWrap">
<!-- ELLIPSIS is EOL until https://github.com/google/styleguide/issues/258 -->
<property name="id" value="SeparatorWrapEllipsis"/>
<property name="tokens" value="ELLIPSIS"/>
<property name="option" value="EOL"/>
</module>
<module name="SeparatorWrap">
<!-- ARRAY_DECLARATOR is EOL until https://github.com/google/styleguide/issues/259 -->
<property name="id" value="SeparatorWrapArrayDeclarator"/>
<property name="tokens" value="ARRAY_DECLARATOR"/>
<property name="option" value="EOL"/>
</module>
<module name="SeparatorWrap">
<property name="id" value="SeparatorWrapMethodRef"/>
<property name="tokens" value="METHOD_REF"/>
<property name="option" value="nl"/>
</module>
<module name="PackageName">
<property name="format" value="^[a-z]+(\.[a-z][a-z0-9]*)*$"/>
<message key="name.invalidPattern"
value="Package name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="TypeName">
<property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, ANNOTATION_DEF"/>
<message key="name.invalidPattern"
value="Type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="MemberName">
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9]*$"/>
<message key="name.invalidPattern"
value="Member name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="ParameterName">
<property name="format" value="^[a-z]([a-z0-9][a-zA-Z0-9]*)?$"/>
<message key="name.invalidPattern"
value="Parameter name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="LambdaParameterName">
<property name="format" value="^[a-z]([a-z0-9][a-zA-Z0-9]*)?$"/>
<message key="name.invalidPattern"
value="Lambda parameter name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="CatchParameterName">
<property name="format" value="^[a-z]([a-z0-9][a-zA-Z0-9]*)?$"/>
<message key="name.invalidPattern"
value="Catch parameter name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="LocalVariableName">
<property name="format" value="^[a-z]([a-z0-9][a-zA-Z0-9]*)?$"/>
<message key="name.invalidPattern"
value="Local variable name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="ClassTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
<message key="name.invalidPattern"
value="Class type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="MethodTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
<message key="name.invalidPattern"
value="Method type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="InterfaceTypeParameterName">
<property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)"/>
<message key="name.invalidPattern"
value="Interface type name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="NoFinalizer"/>
<module name="GenericWhitespace">
<message key="ws.followed"
value="GenericWhitespace ''{0}'' is followed by whitespace."/>
<message key="ws.preceded"
value="GenericWhitespace ''{0}'' is preceded with whitespace."/>
<message key="ws.illegalFollow"
value="GenericWhitespace ''{0}'' should followed by whitespace."/>
<message key="ws.notPreceded"
value="GenericWhitespace ''{0}'' is not preceded with whitespace."/>
</module>
<module name="Indentation">
<property name="basicOffset" value="2"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="2"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="2"/>
</module>
<module name="AbbreviationAsWordInName">
<property name="ignoreFinal" value="false"/>
<property name="allowedAbbreviationLength" value="1"/>
<property name="tokens"
value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, ANNOTATION_DEF, ANNOTATION_FIELD_DEF,
PARAMETER_DEF, VARIABLE_DEF, METHOD_DEF"/>
</module>
<module name="OverloadMethodsDeclarationOrder"/>
<!-- <module name="VariableDeclarationUsageDistance"/>-->
<module name="CustomImportOrder">
<property name="sortImportsInGroupAlphabetically" value="true"/>
<property name="separateLineBetweenGroups" value="true"/>
<property name="customImportOrderRules" value="STATIC###THIRD_PARTY_PACKAGE"/>
<property name="tokens" value="IMPORT, STATIC_IMPORT, PACKAGE_DEF"/>
</module>
<module name="MethodParamPad">
<property name="tokens"
value="CTOR_DEF, LITERAL_NEW, METHOD_CALL, METHOD_DEF,
SUPER_CTOR_CALL, ENUM_CONSTANT_DEF"/>
</module>
<module name="NoWhitespaceBefore">
<property name="tokens"
value="COMMA, SEMI, POST_INC, POST_DEC, DOT, ELLIPSIS,
LABELED_STAT, METHOD_REF"/>
<property name="allowLineBreaks" value="true"/>
</module>
<module name="ParenPad">
<property name="tokens"
value="ANNOTATION, ANNOTATION_FIELD_DEF, CTOR_CALL, CTOR_DEF, DOT, ENUM_CONSTANT_DEF,
EXPR, LITERAL_CATCH, LITERAL_DO, LITERAL_FOR, LITERAL_IF, LITERAL_NEW,
LITERAL_SWITCH, LITERAL_SYNCHRONIZED, LITERAL_WHILE, METHOD_CALL,
METHOD_DEF, QUESTION, RESOURCE_SPECIFICATION, SUPER_CTOR_CALL, LAMBDA"/>
</module>
<module name="OperatorWrap">
<property name="option" value="NL"/>
<property name="tokens"
value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR,
LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/>
</module>
<module name="AnnotationLocation">
<property name="id" value="AnnotationLocationMostCases"/>
<property name="tokens"
value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF"/>
</module>
<module name="AnnotationLocation">
<property name="id" value="AnnotationLocationVariables"/>
<property name="tokens" value="VARIABLE_DEF"/>
<property name="allowSamelineMultipleAnnotations" value="true"/>
</module>
<module name="NonEmptyAtclauseDescription"/>
<module name="InvalidJavadocPosition"/>
<module name="JavadocTagContinuationIndentation"/>
<module name="SummaryJavadoc">
<property name="forbiddenSummaryFragments"
value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
</module>
<module name="JavadocParagraph"/>
<module name="AtclauseOrder">
<property name="tagOrder" value="@param, @return, @throws, @deprecated"/>
<property name="target"
value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
</module>
<module name="JavadocMethod">
<property name="accessModifiers" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
<property name="allowedAnnotations" value="Override, Test"/>
<property name="tokens" value="METHOD_DEF, CTOR_DEF, ANNOTATION_FIELD_DEF"/>
</module>
<!-- <module name="MissingJavadocMethod">-->
<!-- <property name="scope" value="public"/>-->
<!-- <property name="minLineCount" value="2"/>-->
<!-- <property name="allowedAnnotations" value="Override, Test"/>-->
<!-- <property name="tokens" value="METHOD_DEF, CTOR_DEF, ANNOTATION_FIELD_DEF"/>-->
<!-- </module>-->
<module name="MethodName">
<property name="format" value="^[a-z][a-z0-9][a-zA-Z0-9_]*$"/>
<message key="name.invalidPattern"
value="Method name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="SingleLineJavadoc">
<property name="ignoreInlineTags" value="false"/>
</module>
<module name="EmptyCatchBlock">
<property name="exceptionVariableName" value="ignored"/>
</module>
<module name="CommentsIndentation">
<property name="tokens" value="SINGLE_LINE_COMMENT, BLOCK_COMMENT_BEGIN"/>
</module>
<!-- https://checkstyle.org/config_filters.html#SuppressionXpathFilter -->
<module name="SuppressionXpathFilter">
<property name="file" value="${org.checkstyle.google.suppressionxpathfilter.config}"
default="checkstyle-xpath-suppressions.xml" />
<property name="optional" value="true"/>
</module>
</module>
</module>

View file

@ -318,7 +318,7 @@
<property name="ignoreInlineTags" value="false"/>
</module>
<module name="EmptyCatchBlock">
<property name="exceptionVariableName" value="expected"/>
<property name="exceptionVariableName" value="ignored"/>
</module>
<module name="CommentsIndentation">
<property name="tokens" value="SINGLE_LINE_COMMENT, BLOCK_COMMENT_BEGIN"/>
@ -330,4 +330,4 @@
<property name="optional" value="true"/>
</module>
</module>
</module>
</module>

View file

@ -12,7 +12,7 @@
<artifactId>kafka-ui-api</artifactId>
<properties>
<jacoco.version>0.8.8</jacoco.version>
<jacoco.version>0.8.10</jacoco.version>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
<sonar.jacoco.reportPath>${project.basedir}/target/jacoco.exec</sonar.jacoco.reportPath>
@ -21,6 +21,12 @@
</properties>
<dependencies>
<dependency>
<!--TODO: remove, when spring-boot fixed dependency to 6.0.8+ (6.0.7 has CVE) -->
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>6.0.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
@ -109,6 +115,12 @@
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
</dependency>
<!-- https://github.com/provectus/kafka-ui/pull/3693 -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View file

@ -27,6 +27,8 @@ public class ClustersProperties {
String internalTopicPrefix;
Integer adminClientTimeout;
PollingProperties polling = new PollingProperties();
@Data
@ -56,6 +58,8 @@ public class ClustersProperties {
Integer pollTimeoutMs;
Integer partitionPollTimeout;
Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
}
@Data
@ -127,8 +131,9 @@ public class ClustersProperties {
@Data
public static class Masking {
Type type;
List<String> fields; //if null or empty list - policy will be applied to all fields
List<String> pattern; //used when type=MASK
List<String> fields;
String fieldsNamePattern;
List<String> maskingCharsReplacement; //used when type=MASK
String replacement; //used when type=REPLACE
String topicKeysPattern;
String topicValuesPattern;

View file

@ -5,7 +5,6 @@ import java.util.Map;
import lombok.AllArgsConstructor;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties;
import org.springframework.context.ApplicationContext;
@ -15,8 +14,6 @@ import org.springframework.http.server.reactive.ContextPathCompositeHandler;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@Configuration
@ -52,14 +49,7 @@ public class Config {
}
@Bean
public WebClient webClient(
@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
return WebClient.builder()
.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
.build();
}
@Bean
// will be used by webflux json mapping
public JsonNullableModule jsonNullableModule() {
return new JsonNullableModule();
}

View file

@ -0,0 +1,33 @@
package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.exception.ValidationException;
import java.beans.Transient;
import javax.annotation.PostConstruct;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.unit.DataSize;
@Configuration
@ConfigurationProperties("webclient")
@Data
public class WebclientProperties {
String maxInMemoryBufferSize;
@PostConstruct
public void validate() {
validateAndSetDefaultBufferSize();
}
private void validateAndSetDefaultBufferSize() {
if (maxInMemoryBufferSize != null) {
try {
DataSize.parse(maxInMemoryBufferSize);
} catch (Exception e) {
throw new ValidationException("Invalid format for webclient.maxInMemoryBufferSize");
}
}
}
}

View file

@ -0,0 +1,24 @@
package com.provectus.kafka.ui.config.auth;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("spring.ldap")
@Data
public class LdapProperties {
private String urls;
private String base;
private String adminUser;
private String adminPassword;
private String userFilterSearchBase;
private String userFilterSearchFilter;
private String groupFilterSearchBase;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;
@Value("${oauth2.ldap.aсtiveDirectory.domain:@null}")
private String activeDirectoryDomain;
}

View file

@ -1,13 +1,21 @@
package com.provectus.kafka.ui.config.auth;
import static com.provectus.kafka.ui.config.auth.AbstractAuthSecurityConfig.AUTH_WHITELIST;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.ldap.core.DirContextOperations;
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.ldap.core.support.LdapContextSource;
import org.springframework.security.authentication.AuthenticationManager;
@ -16,91 +24,118 @@ import org.springframework.security.authentication.ReactiveAuthenticationManager
import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.ldap.authentication.AbstractLdapAuthenticationProvider;
import org.springframework.security.ldap.authentication.BindAuthenticator;
import org.springframework.security.ldap.authentication.LdapAuthenticationProvider;
import org.springframework.security.ldap.authentication.ad.ActiveDirectoryLdapAuthenticationProvider;
import org.springframework.security.ldap.search.FilterBasedLdapUserSearch;
import org.springframework.security.ldap.search.LdapUserSearch;
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
import org.springframework.security.ldap.userdetails.LdapAuthoritiesPopulator;
import org.springframework.security.ldap.userdetails.LdapUserDetailsMapper;
import org.springframework.security.web.server.SecurityWebFilterChain;
@Configuration
@EnableWebFluxSecurity
@ConditionalOnProperty(value = "auth.type", havingValue = "LDAP")
@Import(LdapAutoConfiguration.class)
@EnableConfigurationProperties(LdapProperties.class)
@RequiredArgsConstructor
@Slf4j
public class LdapSecurityConfig extends AbstractAuthSecurityConfig {
public class LdapSecurityConfig {
@Value("${spring.ldap.urls}")
private String ldapUrls;
@Value("${spring.ldap.dn.pattern:#{null}}")
private String ldapUserDnPattern;
@Value("${spring.ldap.adminUser:#{null}}")
private String adminUser;
@Value("${spring.ldap.adminPassword:#{null}}")
private String adminPassword;
@Value("${spring.ldap.userFilter.searchBase:#{null}}")
private String userFilterSearchBase;
@Value("${spring.ldap.userFilter.searchFilter:#{null}}")
private String userFilterSearchFilter;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;
@Value("${oauth2.ldap.aсtiveDirectory.domain:#{null}}")
private String activeDirectoryDomain;
private final LdapProperties props;
@Bean
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource) {
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource,
LdapAuthoritiesPopulator ldapAuthoritiesPopulator,
@Nullable AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
BindAuthenticator ba = new BindAuthenticator(contextSource);
if (ldapUserDnPattern != null) {
ba.setUserDnPatterns(new String[] {ldapUserDnPattern});
if (props.getBase() != null) {
ba.setUserDnPatterns(new String[] {props.getBase()});
}
if (userFilterSearchFilter != null) {
if (props.getUserFilterSearchFilter() != null) {
LdapUserSearch userSearch =
new FilterBasedLdapUserSearch(userFilterSearchBase, userFilterSearchFilter, contextSource);
new FilterBasedLdapUserSearch(props.getUserFilterSearchBase(), props.getUserFilterSearchFilter(),
contextSource);
ba.setUserSearch(userSearch);
}
AbstractLdapAuthenticationProvider authenticationProvider;
if (!isActiveDirectory) {
authenticationProvider = new LdapAuthenticationProvider(ba);
if (!props.isActiveDirectory()) {
authenticationProvider = rbacEnabled
? new LdapAuthenticationProvider(ba, ldapAuthoritiesPopulator)
: new LdapAuthenticationProvider(ba);
} else {
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(activeDirectoryDomain, ldapUrls);
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(props.getActiveDirectoryDomain(),
props.getUrls()); // TODO Issue #3741
authenticationProvider.setUseAuthenticationRequestCredentials(true);
}
if (rbacEnabled) {
authenticationProvider.setUserDetailsContextMapper(new UserDetailsMapper());
}
AuthenticationManager am = new ProviderManager(List.of(authenticationProvider));
return new ReactiveAuthenticationManagerAdapter(am);
}
@Bean
@Primary
public BaseLdapPathContextSource contextSource() {
LdapContextSource ctx = new LdapContextSource();
ctx.setUrl(ldapUrls);
ctx.setUserDn(adminUser);
ctx.setPassword(adminPassword);
ctx.setUrl(props.getUrls());
ctx.setUserDn(props.getAdminUser());
ctx.setPassword(props.getAdminPassword());
ctx.afterPropertiesSet();
return ctx;
}
@Bean
@Primary
public LdapAuthoritiesPopulator ldapAuthoritiesPopulator(BaseLdapPathContextSource contextSource) {
var authoritiesPopulator = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
authoritiesPopulator.setRolePrefix("");
authoritiesPopulator.setConvertToUpperCase(false);
return authoritiesPopulator;
}
@Bean
public SecurityWebFilterChain configureLdap(ServerHttpSecurity http) {
log.info("Configuring LDAP authentication.");
if (isActiveDirectory) {
if (props.isActiveDirectory()) {
log.info("Active Directory support for LDAP has been enabled.");
}
http
return http
.authorizeExchange()
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
.and()
.httpBasic();
return http.csrf().disable().build();
.and()
.formLogin()
.and()
.logout()
.and()
.csrf().disable()
.build();
}
private static class UserDetailsMapper extends LdapUserDetailsMapper {
@Override
public UserDetails mapUserFromContext(DirContextOperations ctx, String username,
Collection<? extends GrantedAuthority> authorities) {
UserDetails userDetails = super.mapUserFromContext(ctx, username, authorities);
return new RbacLdapUser(userDetails);
}
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.config.auth;
import jakarta.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -14,7 +15,16 @@ public class OAuthProperties {
private Map<String, OAuth2Provider> client = new HashMap<>();
@PostConstruct
public void validate() {
public void init() {
getClient().values().forEach((provider) -> {
if (provider.getCustomParams() == null) {
provider.setCustomParams(Collections.emptyMap());
}
if (provider.getScope() == null) {
provider.setScope(Collections.emptySet());
}
});
getClient().values().forEach(this::validateProvider);
}

View file

@ -73,8 +73,7 @@ public final class OAuthPropertiesConverter {
}
private static boolean isGoogle(OAuth2Provider provider) {
return provider.getCustomParams() != null
&& GOOGLE.equalsIgnoreCase(provider.getCustomParams().get(TYPE));
return GOOGLE.equalsIgnoreCase(provider.getCustomParams().get(TYPE));
}
}

View file

@ -72,13 +72,13 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
final OidcReactiveOAuth2UserService delegate = new OidcReactiveOAuth2UserService();
return request -> delegate.loadUser(request)
.flatMap(user -> {
String providerId = request.getClientRegistration().getRegistrationId();
final var extractor = getExtractor(providerId, acs);
var provider = getProviderByProviderId(request.getClientRegistration().getRegistrationId());
final var extractor = getExtractor(provider, acs);
if (extractor == null) {
return Mono.just(user);
}
return extractor.extract(acs, user, Map.of("request", request))
return extractor.extract(acs, user, Map.of("request", request, "provider", provider))
.map(groups -> new RbacOidcUser(user, groups));
});
}
@ -88,13 +88,13 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
final DefaultReactiveOAuth2UserService delegate = new DefaultReactiveOAuth2UserService();
return request -> delegate.loadUser(request)
.flatMap(user -> {
String providerId = request.getClientRegistration().getRegistrationId();
final var extractor = getExtractor(providerId, acs);
var provider = getProviderByProviderId(request.getClientRegistration().getRegistrationId());
final var extractor = getExtractor(provider, acs);
if (extractor == null) {
return Mono.just(user);
}
return extractor.extract(acs, user, Map.of("request", request))
return extractor.extract(acs, user, Map.of("request", request, "provider", provider))
.map(groups -> new RbacOAuth2User(user, groups));
});
}
@ -113,18 +113,18 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
}
@Nullable
private ProviderAuthorityExtractor getExtractor(final String providerId, AccessControlService acs) {
final String provider = getProviderByProviderId(providerId);
Optional<ProviderAuthorityExtractor> extractor = acs.getExtractors()
private ProviderAuthorityExtractor getExtractor(final OAuthProperties.OAuth2Provider provider,
AccessControlService acs) {
Optional<ProviderAuthorityExtractor> extractor = acs.getOauthExtractors()
.stream()
.filter(e -> e.isApplicable(provider))
.filter(e -> e.isApplicable(provider.getProvider(), provider.getCustomParams()))
.findFirst();
return extractor.orElse(null);
}
private String getProviderByProviderId(final String providerId) {
return properties.getClient().get(providerId).getProvider();
private OAuthProperties.OAuth2Provider getProviderByProviderId(final String providerId) {
return properties.getClient().get(providerId);
}
}

View file

@ -0,0 +1,60 @@
package com.provectus.kafka.ui.config.auth;
import java.util.Collection;
import java.util.stream.Collectors;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
public class RbacLdapUser implements UserDetails, RbacUser {
private final UserDetails userDetails;
public RbacLdapUser(UserDetails userDetails) {
this.userDetails = userDetails;
}
@Override
public String name() {
return userDetails.getUsername();
}
@Override
public Collection<String> groups() {
return userDetails.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
}
@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
return userDetails.getAuthorities();
}
@Override
public String getPassword() {
return userDetails.getPassword();
}
@Override
public String getUsername() {
return userDetails.getUsername();
}
@Override
public boolean isAccountNonExpired() {
return userDetails.isAccountNonExpired();
}
@Override
public boolean isAccountNonLocked() {
return userDetails.isAccountNonLocked();
}
@Override
public boolean isCredentialsNonExpired() {
return userDetails.isCredentialsNonExpired();
}
@Override
public boolean isEnabled() {
return userDetails.isEnabled();
}
}

View file

@ -0,0 +1,21 @@
package com.provectus.kafka.ui.config.auth.condition;
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public class ActiveDirectoryCondition extends AllNestedConditions {
public ActiveDirectoryCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@ConditionalOnProperty(value = "auth.type", havingValue = "LDAP")
public static class OnAuthType {
}
@ConditionalOnProperty(value = "${oauth2.ldap.activeDirectory}:false", havingValue = "true", matchIfMissing = false)
public static class OnActiveDirectory {
}
}

View file

@ -46,10 +46,8 @@ public class CognitoLogoutSuccessHandler implements LogoutSuccessHandler {
.fragment(null)
.build();
Assert.isTrue(
provider.getCustomParams() != null && provider.getCustomParams().containsKey("logoutUrl"),
"Custom params should contain 'logoutUrl'"
);
Assert.isTrue(provider.getCustomParams().containsKey("logoutUrl"),
"Custom params should contain 'logoutUrl'");
final var uri = UriComponentsBuilder
.fromUri(URI.create(provider.getCustomParams().get("logoutUrl")))
.queryParam("client_id", provider.getClientId())

View file

@ -0,0 +1,115 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.AclsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.KafkaAclDTO;
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
public class AclsController extends AbstractController implements AclsApi {
private final AclsService aclsService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.build();
return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.build();
return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
KafkaAclResourceTypeDTO resourceTypeDto,
String resourceName,
KafkaAclNamePatternTypeDTO namePatternTypeDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.build();
var resourceType = Optional.ofNullable(resourceTypeDto)
.map(ClusterMapper::mapAclResourceTypeDto)
.orElse(ResourceType.ANY);
var namePatternType = Optional.ofNullable(namePatternTypeDto)
.map(ClusterMapper::mapPatternTypeDto)
.orElse(PatternType.ANY);
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
return accessControlService.validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
);
}
@Override
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.build();
return accessControlService.validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
);
}
@Override
public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> csvMono, ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.build();
return accessControlService.validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -27,6 +27,7 @@ import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
@ -92,16 +93,19 @@ public class ApplicationConfigController implements ApplicationConfigApi {
}
@Override
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(FilePart file, ServerWebExchange exchange) {
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
.then(dynamicConfigOperations.uploadConfigRelatedFile(file))
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok);
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok));
}
@Override

View file

@ -211,7 +211,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
.build());
return validateAccess.then(

View file

@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers;
@Slf4j
public class MessagesController extends AbstractController implements MessagesApi {
private static final int MAX_LOAD_RECORD_LIMIT = 100;
private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
private final MessagesService messagesService;
private final DeserializationService deserializationService;
private final AccessControlService accessControlService;
@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
var positions = new ConsumerPosition(
seekType,
@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde)
limit, seekDirection, keySerde, valueSerde)
)
);

View file

@ -39,41 +39,42 @@ public class MessageFilters {
}
static Predicate<TopicMessageDTO> groovyScriptFilter(String script) {
var compiledScript = compileScript(script);
var engine = getGroovyEngine();
var compiledScript = compileScript(engine, script);
var jsonSlurper = new JsonSlurper();
return new Predicate<TopicMessageDTO>() {
@SneakyThrows
@Override
public boolean test(TopicMessageDTO msg) {
var bindings = getGroovyEngine().createBindings();
var bindings = engine.createBindings();
bindings.put("partition", msg.getPartition());
bindings.put("offset", msg.getOffset());
bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli());
bindings.put("keyAsText", msg.getKey());
bindings.put("valueAsText", msg.getContent());
bindings.put("headers", msg.getHeaders());
bindings.put("key", parseToJsonOrReturnNull(jsonSlurper, msg.getKey()));
bindings.put("value", parseToJsonOrReturnNull(jsonSlurper, msg.getContent()));
bindings.put("key", parseToJsonOrReturnAsIs(jsonSlurper, msg.getKey()));
bindings.put("value", parseToJsonOrReturnAsIs(jsonSlurper, msg.getContent()));
var result = compiledScript.eval(bindings);
if (result instanceof Boolean) {
return (Boolean) result;
} else {
throw new ValidationException(
String.format("Unexpected script result: %s, Boolean should be returned instead", result));
"Unexpected script result: %s, Boolean should be returned instead".formatted(result));
}
}
};
}
@Nullable
private static Object parseToJsonOrReturnNull(JsonSlurper parser, @Nullable String str) {
private static Object parseToJsonOrReturnAsIs(JsonSlurper parser, @Nullable String str) {
if (str == null) {
return null;
}
try {
return parser.parseText(str);
} catch (Exception e) {
return null;
return str;
}
}
@ -86,9 +87,9 @@ public class MessageFilters {
return GROOVY_ENGINE;
}
private static CompiledScript compileScript(String script) {
private static CompiledScript compileScript(GroovyScriptEngineImpl engine, String script) {
try {
return getGroovyEngine().compile(script);
return engine.compile(script);
} catch (ScriptException e) {
throw new ValidationException("Script syntax error: " + e.getMessage());
}

View file

@ -20,6 +20,9 @@ import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaAclDTO;
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.MetricDTO;
import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.PartitionDTO;
@ -27,12 +30,18 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@ -109,8 +118,74 @@ public interface ClusterMapper {
return brokerDiskUsage;
}
default DataMasking map(List<ClustersProperties.Masking> maskingProperties) {
return DataMasking.create(maskingProperties);
static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
return switch (operation) {
case ALL -> KafkaAclDTO.OperationEnum.ALL;
case READ -> KafkaAclDTO.OperationEnum.READ;
case WRITE -> KafkaAclDTO.OperationEnum.WRITE;
case CREATE -> KafkaAclDTO.OperationEnum.CREATE;
case DELETE -> KafkaAclDTO.OperationEnum.DELETE;
case ALTER -> KafkaAclDTO.OperationEnum.ALTER;
case DESCRIBE -> KafkaAclDTO.OperationEnum.DESCRIBE;
case CLUSTER_ACTION -> KafkaAclDTO.OperationEnum.CLUSTER_ACTION;
case DESCRIBE_CONFIGS -> KafkaAclDTO.OperationEnum.DESCRIBE_CONFIGS;
case ALTER_CONFIGS -> KafkaAclDTO.OperationEnum.ALTER_CONFIGS;
case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE;
case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS;
case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS;
case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter");
case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN;
};
}
static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) {
return switch (resourceType) {
case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER;
case TOPIC -> KafkaAclResourceTypeDTO.TOPIC;
case GROUP -> KafkaAclResourceTypeDTO.GROUP;
case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN;
case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID;
case USER -> KafkaAclResourceTypeDTO.USER;
case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter");
case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN;
};
}
static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) {
return ResourceType.valueOf(dto.name());
}
static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) {
return PatternType.valueOf(dto.name());
}
static AclBinding toAclBinding(KafkaAclDTO dto) {
return new AclBinding(
new ResourcePattern(
mapAclResourceTypeDto(dto.getResourceType()),
dto.getResourceName(),
mapPatternTypeDto(dto.getNamePatternType())
),
new AccessControlEntry(
dto.getPrincipal(),
dto.getHost(),
AclOperation.valueOf(dto.getOperation().name()),
AclPermissionType.valueOf(dto.getPermission().name())
)
);
}
static KafkaAclDTO toKafkaAclDto(AclBinding binding) {
var pattern = binding.pattern();
var filter = binding.toFilter().entryFilter();
return new KafkaAclDTO()
.resourceType(mapAclResourceType(pattern.resourceType()))
.resourceName(pattern.name())
.namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name()))
.principal(filter.principal())
.host(filter.host())
.operation(mapAclOperation(filter.operation()))
.permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name()));
}
}

View file

@ -11,8 +11,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@ -82,15 +80,8 @@ public class ConsumerGroupMapper {
InternalConsumerGroup c, T consumerGroup) {
consumerGroup.setGroupId(c.getGroupId());
consumerGroup.setMembers(c.getMembers().size());
int numTopics = Stream.concat(
c.getOffsets().keySet().stream().map(TopicPartition::topic),
c.getMembers().stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).collect(Collectors.toSet()).size();
consumerGroup.setMessagesBehind(c.getMessagesBehind());
consumerGroup.setTopics(numTopics);
consumerGroup.setTopics(c.getTopicNum());
consumerGroup.setSimple(c.isSimple());
Optional.ofNullable(c.getState())

View file

@ -4,5 +4,7 @@ public enum ClusterFeature {
KAFKA_CONNECT,
KSQL_DB,
SCHEMA_REGISTRY,
TOPIC_DELETION
TOPIC_DELETION,
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import javax.annotation.Nullable;
import lombok.Data;
import org.apache.kafka.common.Node;
@ -10,15 +11,27 @@ public class InternalBroker {
private final Integer id;
private final String host;
private final Integer port;
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;
private final @Nullable BigDecimal bytesInPerSec;
private final @Nullable BigDecimal bytesOutPerSec;
private final @Nullable Integer partitionsLeader;
private final @Nullable Integer partitions;
private final @Nullable Integer inSyncPartitions;
private final @Nullable BigDecimal leadersSkew;
private final @Nullable BigDecimal partitionsSkew;
public InternalBroker(Node node, Statistics statistics) {
public InternalBroker(Node node,
PartitionDistributionStats partitionDistribution,
Statistics statistics) {
this.id = node.id();
this.host = node.host();
this.port = node.port();
this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
this.partitions = partitionDistribution.getPartitionsCount().get(node);
this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
this.leadersSkew = partitionDistribution.leadersSkew(node);
this.partitionsSkew = partitionDistribution.partitionsSkew(node);
}
}

View file

@ -5,6 +5,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@ -21,6 +22,7 @@ public class InternalConsumerGroup {
private final Map<TopicPartition, Long> offsets;
private final Map<TopicPartition, Long> endOffsets;
private final Long messagesBehind;
private final Integer topicNum;
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
@ -44,22 +46,12 @@ public class InternalConsumerGroup {
builder.simple(description.isSimpleConsumerGroup());
builder.state(description.state());
builder.partitionAssignor(description.partitionAssignor());
builder.members(
description.members().stream()
.map(m ->
InternalConsumerGroup.InternalMember.builder()
.assignment(m.assignment().topicPartitions())
.clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList())
);
Collection<InternalMember> internalMembers = initInternalMembers(description);
builder.members(internalMembers);
builder.offsets(groupOffsets);
builder.endOffsets(topicEndOffsets);
builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets));
builder.topicNum(calculateTopicNum(groupOffsets, internalMembers));
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
return builder.build();
}
@ -80,4 +72,31 @@ public class InternalConsumerGroup {
return messagesBehind;
}
private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
long topicNum = Stream.concat(
offsets.keySet().stream().map(TopicPartition::topic),
members.stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).distinct().count();
return Integer.valueOf((int) topicNum);
}
private static Collection<InternalMember> initInternalMembers(ConsumerGroupDescription description) {
return description.members().stream()
.map(m ->
InternalConsumerGroup.InternalMember.builder()
.assignment(m.assignment().topicPartitions())
.clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList());
}
}

View file

@ -0,0 +1,92 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Slf4j
public class PartitionDistributionStats {
// avg skew will show unuseful results on low number of partitions
private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
private final Map<Node, Integer> partitionLeaders;
private final Map<Node, Integer> partitionsCount;
private final Map<Node, Integer> inSyncPartitions;
private final double avgLeadersCntPerBroker;
private final double avgPartitionsPerBroker;
private final boolean skewCanBeCalculated;
public static PartitionDistributionStats create(Statistics stats) {
return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
}
static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
var partitionLeaders = new HashMap<Node, Integer>();
var partitionsReplicated = new HashMap<Node, Integer>();
var isr = new HashMap<Node, Integer>();
int partitionsCnt = 0;
for (TopicDescription td : stats.getTopicDescriptions().values()) {
for (TopicPartitionInfo tp : td.partitions()) {
partitionsCnt++;
tp.replicas().forEach(r -> incr(partitionsReplicated, r));
tp.isr().forEach(r -> incr(isr, r));
if (tp.leader() != null) {
incr(partitionLeaders, tp.leader());
}
}
}
int nodesWithPartitions = partitionsReplicated.size();
int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
int nodesWithLeaders = partitionLeaders.size();
int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
return new PartitionDistributionStats(
partitionLeaders,
partitionsReplicated,
isr,
avgLeadersCntPerBroker,
avgPartitionsPerBroker,
partitionsCnt >= minPartitionsForSkewCalculation
);
}
private static void incr(Map<Node, Integer> map, Node n) {
map.compute(n, (k, c) -> c == null ? 1 : ++c);
}
@Nullable
public BigDecimal partitionsSkew(Node node) {
return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
}
@Nullable
public BigDecimal leadersSkew(Node node) {
return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
}
// Returns difference (in percents) from average value, null if it can't be calculated
@Nullable
private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
if (avgValue == 0 || !skewCanBeCalculated) {
return null;
}
value = value == null ? 0 : value;
return new BigDecimal((value - avgValue) / avgValue * 100.0)
.setScale(1, RoundingMode.HALF_UP);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@ -37,6 +38,8 @@ public class AccessContext {
Collection<KsqlAction> ksqlActions;
Collection<AclAction> aclActions;
public static AccessContextBuilder builder() {
return new AccessContextBuilder();
}
@ -55,6 +58,7 @@ public class AccessContext {
private String schema;
private Collection<SchemaAction> schemaActions = Collections.emptySet();
private Collection<KsqlAction> ksqlActions = Collections.emptySet();
private Collection<AclAction> aclActions = Collections.emptySet();
private AccessContextBuilder() {
}
@ -131,6 +135,12 @@ public class AccessContext {
return this;
}
public AccessContextBuilder aclActions(AclAction... actions) {
Assert.isTrue(actions.length > 0, "actions not present");
this.aclActions = List.of(actions);
return this;
}
public AccessContext build() {
return new AccessContext(
applicationConfigActions,
@ -140,7 +150,7 @@ public class AccessContext {
connect, connectActions,
connector,
schema, schemaActions,
ksqlActions);
ksqlActions, aclActions);
}
}
}

View file

@ -1,8 +1,10 @@
package com.provectus.kafka.ui.model.rbac;
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@ -25,6 +27,8 @@ import org.springframework.util.Assert;
@EqualsAndHashCode
public class Permission {
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST = List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG);
Resource resource;
List<String> actions;
@ -50,7 +54,7 @@ public class Permission {
public void validate() {
Assert.notNull(resource, "resource cannot be null");
if (!List.of(KSQL, CLUSTERCONFIG).contains(this.resource)) {
if (!RBAC_ACTION_EXEMPT_LIST.contains(this.resource)) {
Assert.notNull(value, "permission value can't be empty for resource " + resource);
}
}
@ -73,6 +77,7 @@ public class Permission {
case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList();
case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList();
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
};
}

View file

@ -11,7 +11,8 @@ public enum Resource {
CONSUMER,
SCHEMA,
CONNECT,
KSQL;
KSQL,
ACL;
@Nullable
public static Resource fromString(String name) {

View file

@ -0,0 +1,15 @@
package com.provectus.kafka.ui.model.rbac.permission;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum AclAction implements PermissibleAction {
VIEW,
EDIT;
@Nullable
public static AclAction fromString(String name) {
return EnumUtils.getEnum(AclAction.class, name);
}
}

View file

@ -7,7 +7,8 @@ public enum ConnectAction implements PermissibleAction {
VIEW,
EDIT,
CREATE
CREATE,
RESTART
;

View file

@ -10,6 +10,8 @@ public enum Provider {
OAUTH_COGNITO,
OAUTH,
LDAP,
LDAP_AD;
@ -22,6 +24,8 @@ public enum Provider {
public static String GOOGLE = "google";
public static String GITHUB = "github";
public static String COGNITO = "cognito";
public static String OAUTH = "oauth";
}
}

View file

@ -123,11 +123,11 @@ public class ConsumerRecordDeserializer {
}
private static Long getKeySize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
return consumerRecord.key() != null ? (long) consumerRecord.key().get().length : null;
return consumerRecord.key() != null ? (long) consumerRecord.serializedKeySize() : null;
}
private static Long getValueSize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
return consumerRecord.value() != null ? (long) consumerRecord.value().get().length : null;
return consumerRecord.value() != null ? (long) consumerRecord.serializedValueSize() : null;
}
private static int headerSize(Header header) {

View file

@ -122,8 +122,6 @@ public class SerdesInitializer {
registeredSerdes,
Optional.ofNullable(clusterProperties.getDefaultKeySerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
.orElse(null),
Optional.ofNullable(clusterProperties.getDefaultValueSerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))

View file

@ -1,33 +1,36 @@
package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
@Slf4j
public class AdminClientServiceImpl implements AdminClientService, Closeable {
private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
@Setter // used in tests
@Value("${kafka.admin-client-timeout:30000}")
private int clientTimeout;
private final int clientTimeout;
public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}
@Override
public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
@ -42,7 +45,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(
AdminClientConfig.CLIENT_ID_CONFIG,
"kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()

View file

@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionDistributionStats;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Collections;
import java.util.HashMap;
@ -64,11 +65,13 @@ public class BrokerService {
}
public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
var stats = statisticsCache.get(cluster);
var partitionsDistribution = PartitionDistributionStats.create(stats);
return adminClientService
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
.map(node -> new InternalBroker(node, statisticsCache.get(cluster)))
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
}

View file

@ -101,6 +101,9 @@ public class ConsumerGroupService {
public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) {
}
private record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) {
}
public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
KafkaCluster cluster,
int pageNum,
@ -159,22 +162,19 @@ public class ConsumerGroupService {
sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
}
case MESSAGES_BEHIND -> {
record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) { }
Comparator<GroupWithDescr> comparator = Comparator.comparingLong(gwd ->
gwd.icg.getMessagesBehind() == null ? 0L : gwd.icg.getMessagesBehind());
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
}
case TOPIC_NUM -> {
Comparator<GroupWithDescr> comparator = Comparator.comparingInt(gwd -> gwd.icg.getTopicNum());
yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
yield ac.describeConsumerGroups(groupNames)
.flatMap(descriptionsMap -> {
List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
return getConsumerGroups(ac, descriptions)
.map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
.map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
.map(GroupWithDescr::cgd).toList());
}
);
}
};
}
@ -209,6 +209,27 @@ public class ConsumerGroupService {
.map(cgs -> new ArrayList<>(cgs.values()));
}
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
Comparator<GroupWithDescr> comparator,
int pageNum,
int perPage,
SortOrderDTO sortOrderDto) {
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
return ac.describeConsumerGroups(groupNames)
.flatMap(descriptionsMap -> {
List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
return getConsumerGroups(ac, descriptions)
.map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
.map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
.map(GroupWithDescr::cgd).toList());
}
);
}
public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster,
String consumerGroupId) {
return adminClientService.get(cluster)

View file

@ -2,16 +2,16 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -21,12 +21,11 @@ import reactor.core.publisher.Mono;
@Slf4j
public class FeatureService {
private static final String DELETE_TOPIC_ENABLED_SERVER_PROPERTY = "delete.topic.enable";
private final AdminClientService adminClientService;
public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster,
ReactiveAdminClient.ClusterDescription clusterDescription) {
public Mono<List<ClusterFeature>> getAvailableFeatures(ReactiveAdminClient adminClient,
KafkaCluster cluster,
ClusterDescription clusterDescription) {
List<Mono<ClusterFeature>> features = new ArrayList<>();
if (Optional.ofNullable(cluster.getConnectsClients())
@ -43,26 +42,32 @@ public class FeatureService {
features.add(Mono.just(ClusterFeature.SCHEMA_REGISTRY));
}
features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
features.add(topicDeletionEnabled(adminClient));
features.add(aclView(cluster));
features.add(aclEdit(clusterDescription));
return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
private Mono<ClusterFeature> topicDeletionEnabled(KafkaCluster cluster, @Nullable Node controller) {
if (controller == null) {
return Mono.just(ClusterFeature.TOPIC_DELETION); // assuming it is enabled by default
}
return adminClientService.get(cluster)
.flatMap(ac -> ac.loadBrokersConfig(List.of(controller.id())))
.map(config ->
config.values().stream()
.flatMap(Collection::stream)
.filter(e -> e.name().equals(DELETE_TOPIC_ENABLED_SERVER_PROPERTY))
.map(e -> Boolean.parseBoolean(e.value()))
.findFirst()
.orElse(true))
.flatMap(enabled -> enabled
? Mono.just(ClusterFeature.TOPIC_DELETION)
: Mono.empty());
private Mono<ClusterFeature> topicDeletionEnabled(ReactiveAdminClient adminClient) {
return adminClient.isTopicDeletionEnabled()
? Mono.just(ClusterFeature.TOPIC_DELETION)
: Mono.empty();
}
private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
return canEdit
? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
: Mono.empty();
}
private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(
ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
: Mono.empty()
);
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
@ -22,9 +23,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
@ -34,12 +33,18 @@ import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaClusterFactory {
@Value("${webclient.max-in-memory-buffer-size:20MB}")
private DataSize maxBuffSize;
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
private final DataSize webClientMaxBuffSize;
public KafkaClusterFactory(WebclientProperties webclientProperties) {
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
.map(DataSize::parse)
.orElse(DEFAULT_WEBCLIENT_BUFFER);
}
public KafkaCluster create(ClustersProperties properties,
ClustersProperties.Cluster clusterProperties) {
@ -140,7 +145,7 @@ public class KafkaClusterFactory {
url -> new RetryingKafkaConnectClient(
connectCluster.toBuilder().address(url).build(),
cluster.getSsl(),
maxBuffSize
webClientMaxBuffSize
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No alive connect instances available",
@ -158,7 +163,7 @@ public class KafkaClusterFactory {
WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureBufferSize(maxBuffSize)
.configureBufferSize(webClientMaxBuffSize)
.build();
return ReactiveFailover.create(
parseUrlList(clusterProperties.getSchemaRegistry()),
@ -181,7 +186,7 @@ public class KafkaClusterFactory {
clusterProperties.getKsqldbServerAuth(),
clusterProperties.getSsl(),
clusterProperties.getKsqldbServerSsl(),
maxBuffSize
webClientMaxBuffSize
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No live ksqldb instances available",

View file

@ -109,6 +109,7 @@ public class KafkaConnectService {
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
return Stream.of(
fullConnectorInfo.getName(),
fullConnectorInfo.getConnect(),
fullConnectorInfo.getStatus().getState().getValue(),
fullConnectorInfo.getType().getValue());
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
@ -44,16 +45,35 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Service
@RequiredArgsConstructor
@Slf4j
public class MessagesService {
private static final int DEFAULT_MAX_PAGE_SIZE = 500;
private static final int DEFAULT_PAGE_SIZE = 100;
// limiting UI messages rate to 20/sec in tailing mode
public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private final AdminClientService adminClientService;
private final DeserializationService deserializationService;
private final ConsumerGroupService consumerGroupService;
private final int maxPageSize;
private final int defaultPageSize;
public MessagesService(AdminClientService adminClientService,
DeserializationService deserializationService,
ConsumerGroupService consumerGroupService,
ClustersProperties properties) {
this.adminClientService = adminClientService;
this.deserializationService = deserializationService;
this.consumerGroupService = consumerGroupService;
var pollingProps = Optional.ofNullable(properties.getPolling())
.orElseGet(ClustersProperties.PollingProperties::new);
this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
.orElse(DEFAULT_MAX_PAGE_SIZE);
this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
.orElse(DEFAULT_PAGE_SIZE);
}
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster)
@ -139,7 +159,7 @@ public class MessagesService {
ConsumerPosition consumerPosition,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
@Nullable Integer pageSize,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
@ -147,7 +167,13 @@ public class MessagesService {
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, limit, seekDirection, keySerde, valueSerde));
filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
}
private int fixPageSize(@Nullable Integer pageSize) {
return Optional.ofNullable(pageSize)
.filter(ps -> ps > 0 && ps <= maxPageSize)
.orElse(defaultPageSize);
}
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,

View file

@ -4,6 +4,8 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
@ -14,7 +16,6 @@ import com.provectus.kafka.ui.util.KafkaVersion;
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -31,8 +32,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
@ -60,16 +62,21 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -78,29 +85,33 @@ import reactor.util.function.Tuples;
@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
public class ReactiveAdminClient implements Closeable {
private enum SupportedFeature {
public enum SupportedFeature {
INCREMENTAL_ALTER_CONFIGS(2.3f),
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
private final float sinceVersion;
private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
SupportedFeature(float sinceVersion) {
this.sinceVersion = sinceVersion;
SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
this.predicate = predicate;
}
static Set<SupportedFeature> forVersion(float kafkaVersion) {
return Arrays.stream(SupportedFeature.values())
.filter(f -> kafkaVersion >= f.sinceVersion)
SupportedFeature(float fromVersion) {
this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
}
static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersionStr) {
@Nullable Float kafkaVersion = KafkaVersion.parse(kafkaVersionStr).orElse(null);
return Flux.fromArray(SupportedFeature.values())
.flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
.filter(Tuple2::getT2)
.map(Tuple2::getT1)
.collect(Collectors.toSet());
}
static Set<SupportedFeature> defaultFeatures() {
return Set.of();
}
}
@Value
@ -109,25 +120,58 @@ public class ReactiveAdminClient implements Closeable {
Node controller;
String clusterId;
Collection<Node> nodes;
@Nullable // null, if ACL is disabled
Set<AclOperation> authorizedOperations;
}
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
return getClusterVersion(adminClient)
.map(ver ->
new ReactiveAdminClient(
adminClient,
ver,
getSupportedUpdateFeaturesForVersion(ver)));
@Builder
private record ConfigRelatedInfo(String version,
Set<SupportedFeature> features,
boolean topicDeletionIsAllowed) {
private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
return loadBrokersConfig(ac, List.of(controllerId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
.flatMap(configs -> {
String version = "1.0-UNKNOWN";
boolean topicDeletionEnabled = true;
for (ConfigEntry entry : configs) {
if (entry.name().contains("inter.broker.protocol.version")) {
version = entry.value();
}
if (entry.name().equals("delete.topic.enable")) {
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
}
}
var builder = ConfigRelatedInfo.builder()
.version(version)
.topicDeletionIsAllowed(topicDeletionEnabled);
return SupportedFeature.forVersion(ac, version)
.map(features -> builder.features(features).build());
});
}
}
private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
try {
float version = KafkaVersion.parse(versionStr);
return SupportedFeature.forVersion(version);
} catch (NumberFormatException e) {
return SupportedFeature.defaultFeatures();
}
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
return describeClusterImpl(adminClient, Set.of())
// choosing node from which we will get configs (starting with controller)
.flatMap(descr -> descr.controller != null
? Mono.just(descr.controller)
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
)
.flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
.map(info -> new ReactiveAdminClient(adminClient, info));
}
private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
.thenReturn(true)
.doOnError(th -> !(th instanceof SecurityDisabledException)
&& !(th instanceof InvalidRequestException)
&& !(th instanceof UnsupportedVersionException),
th -> log.warn("Error checking if security enabled", th))
.onErrorReturn(false);
}
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
@ -158,8 +202,11 @@ public class ReactiveAdminClient implements Closeable {
@Getter(AccessLevel.PACKAGE) // visible for testing
private final AdminClient client;
private final String version;
private final Set<SupportedFeature> features;
private volatile ConfigRelatedInfo configRelatedInfo;
public Set<SupportedFeature> getClusterFeatures() {
return configRelatedInfo.features();
}
public Mono<Set<String>> listTopics(boolean listInternal) {
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
@ -170,7 +217,20 @@ public class ReactiveAdminClient implements Closeable {
}
public String getVersion() {
return version;
return configRelatedInfo.version();
}
public boolean isTopicDeletionEnabled() {
return configRelatedInfo.topicDeletionIsAllowed();
}
public Mono<Void> updateInternalStats(@Nullable Node controller) {
if (controller == null) {
return Mono.empty();
}
return ConfigRelatedInfo.extract(client, controller.id())
.doOnNext(info -> this.configRelatedInfo = info)
.then();
}
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
@ -180,7 +240,7 @@ public class ReactiveAdminClient implements Closeable {
//NOTE: skips not-found topics (for which UnknownTopicOrPartitionException was thrown by AdminClient)
//and topics for which DESCRIBE_CONFIGS permission is not set (TopicAuthorizationException was thrown)
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
var includeDocFixed = includeDoc && getClusterFeatures().contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL);
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topicNames,
@ -329,7 +389,7 @@ public class ReactiveAdminClient implements Closeable {
}
public Mono<ClusterDescription> describeCluster() {
return describeClusterImpl(client, features);
return describeClusterImpl(client, getClusterFeatures());
}
private static Mono<ClusterDescription> describeClusterImpl(AdminClient client, Set<SupportedFeature> features) {
@ -351,23 +411,6 @@ public class ReactiveAdminClient implements Closeable {
);
}
private static Mono<String> getClusterVersion(AdminClient client) {
return describeClusterImpl(client, Set.of())
// choosing node from which we will get configs (starting with controller)
.flatMap(descr -> descr.controller != null
? Mono.just(descr.controller)
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
)
.flatMap(node -> loadBrokersConfig(client, List.of(node.id())))
.flatMap(configs -> configs.values().stream()
.flatMap(Collection::stream)
.filter(entry -> entry.name().contains("inter.broker.protocol.version"))
.findFirst()
.map(configEntry -> Mono.just(configEntry.value()))
.orElse(Mono.empty()))
.switchIfEmpty(Mono.just("1.0-UNKNOWN"));
}
public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
return toMono(client.deleteConsumerGroups(groupIds).all())
.onErrorResume(GroupIdNotFoundException.class,
@ -401,7 +444,7 @@ public class ReactiveAdminClient implements Closeable {
// NOTE: places whole current topic config with new one. Entries that were present in old config,
// but missed in new will be set to default
public Mono<Void> updateTopicConfig(String topicName, Map<String, String> configs) {
if (features.contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
if (getClusterFeatures().contains(SupportedFeature.INCREMENTAL_ALTER_CONFIGS)) {
return getTopicsConfigImpl(List.of(topicName), false)
.map(conf -> conf.getOrDefault(topicName, List.of()))
.flatMap(currentConfigs -> incrementalAlterConfig(topicName, currentConfigs, configs));
@ -498,6 +541,14 @@ public class ReactiveAdminClient implements Closeable {
.flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
}
/**
* List offset for the specified topics, skipping no-leader partitions.
*/
public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
OffsetSpec offsetSpec) {
return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
}
private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
boolean failOnUnknownLeader) {
var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
@ -507,34 +558,44 @@ public class ReactiveAdminClient implements Closeable {
descriptions.values(), partitions::contains, failOnUnknownLeader));
}
private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
@VisibleForTesting
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
Predicate<TopicPartition> partitionPredicate,
boolean failOnUnknownLeader) {
var goodPartitions = new HashSet<TopicPartition>();
for (TopicDescription description : topicDescriptions) {
var goodTopicPartitions = new ArrayList<TopicPartition>();
for (TopicPartitionInfo partitionInfo : description.partitions()) {
TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
if (!partitionPredicate.test(topicPartition)) {
continue;
if (partitionInfo.leader() == null) {
if (failOnUnknownLeader) {
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
} else {
// if ANY of topic partitions has no leader - we have to skip all topic partitions
goodTopicPartitions.clear();
break;
}
}
if (partitionInfo.leader() != null) {
goodPartitions.add(topicPartition);
} else if (failOnUnknownLeader) {
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
if (partitionPredicate.test(topicPartition)) {
goodTopicPartitions.add(topicPartition);
}
}
goodPartitions.addAll(goodTopicPartitions);
}
return goodPartitions;
}
// 1. NOTE(!): should only apply for partitions with existing leader,
// 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
// otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
// 2. NOTE(!): Skips partitions that were not initialized yet
// (UnknownTopicOrPartitionException thrown, ex. after topic creation)
// 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
@KafkaClientInternalsDependant
public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
OffsetSpec offsetSpec) {
@VisibleForTesting
Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
if (partitions.isEmpty()) {
return Mono.just(Map.of());
}
Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
parts -> {
@ -557,6 +618,22 @@ public class ReactiveAdminClient implements Closeable {
);
}
public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
}
public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.createAcls(aclBindings).all());
}
public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
return toMono(client.deleteAcls(filters).all()).then();
}
public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);

View file

@ -37,25 +37,26 @@ public class StatisticsService {
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster().flatMap(description ->
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(cluster, description),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->
Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
.metrics((Metrics) results[0])
.logDirInfo((InternalLogDirStats) results[1])
.features((List<ClusterFeature>) results[2])
.topicConfigs((Map<String, List<ConfigEntry>>) results[3])
.topicDescriptions((Map<String, TopicDescription>) results[4])
.build()
)))
ac.updateInternalStats(description.getController()).then(
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(ac, cluster, description),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->
Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
.metrics((Metrics) results[0])
.logDirInfo((InternalLogDirStats) results[1])
.features((List<ClusterFeature>) results[2])
.topicConfigs((Map<String, List<ConfigEntry>>) results[3])
.topicDescriptions((Map<String, TopicDescription>) results[4])
.build()
))))
.doOnError(e ->
log.error("Failed to collect cluster {} info", cluster.getName(), e))
.onErrorResume(

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.TopicMetadataException;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
@ -136,22 +137,14 @@ public class TopicsService {
}
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
descriptions,
descriptionsMap,
ReactiveAdminClient ac) {
var topicPartitions = descriptions.values().stream()
.flatMap(desc ->
desc.partitions().stream()
// list offsets should only be applied to partitions with existing leader
// (see ReactiveAdminClient.listOffsetsUnsafe(..) docs)
.filter(tp -> tp.leader() != null)
.map(p -> new TopicPartition(desc.name(), p.partition())))
.collect(toList());
return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest())
.zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()),
var descriptions = descriptionsMap.values();
return ac.listOffsets(descriptions, OffsetSpec.earliest())
.zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()),
(earliest, latest) ->
topicPartitions.stream()
.filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
Sets.intersection(earliest.keySet(), latest.keySet())
.stream()
.map(tp ->
Map.entry(tp,
new InternalPartitionsOffsets.Offsets(

View file

@ -0,0 +1,81 @@
package com.provectus.kafka.ui.service.acl;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
public class AclCsv {
private static final String LINE_SEPARATOR = System.lineSeparator();
private static final String VALUES_SEPARATOR = ",";
private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host";
public static String transformToCsvString(Collection<AclBinding> acls) {
return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString))
.collect(Collectors.joining(System.lineSeparator()));
}
public static String createAclString(AclBinding binding) {
var pattern = binding.pattern();
var filter = binding.toFilter().entryFilter();
return String.format(
"%s,%s,%s,%s,%s,%s,%s",
filter.principal(),
pattern.resourceType(),
pattern.patternType(),
pattern.name(),
filter.operation(),
filter.permissionType(),
filter.host()
);
}
private static AclBinding parseCsvLine(String csv, int line) {
String[] values = csv.split(VALUES_SEPARATOR);
if (values.length != 7) {
throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line);
}
for (int i = 0; i < values.length; i++) {
if ((values[i] = values[i].trim()).isBlank()) {
throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line);
}
}
try {
return new AclBinding(
new ResourcePattern(
ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])),
new AccessControlEntry(
values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5]))
);
} catch (IllegalArgumentException enumParseError) {
throw new ValidationException("Error parsing enum value in line " + line);
}
}
public static Collection<AclBinding> parseCsv(String csvString) {
String[] lines = csvString.split(LINE_SEPARATOR);
if (lines.length == 0) {
throw new ValidationException("Error parsing ACL csv file: no lines in file");
}
boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", ""));
Set<AclBinding> result = new HashSet<>();
for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) {
String line = lines[i];
if (!line.isBlank()) {
AclBinding aclBinding = parseCsvLine(line, i);
result.add(aclBinding);
}
}
return result;
}
}

View file

@ -0,0 +1,93 @@
package com.provectus.kafka.ui.service.acl;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@Service
@RequiredArgsConstructor
public class AclsService {
private final AdminClientService adminClientService;
public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
var aclString = AclCsv.createAclString(aclBinding);
log.info("CREATING ACL: [{}]", aclString);
return adminClientService.get(cluster)
.flatMap(ac -> ac.createAcls(List.of(aclBinding)))
.doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
}
public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
var aclString = AclCsv.createAclString(aclBinding);
log.info("DELETING ACL: [{}]", aclString);
return adminClientService.get(cluster)
.flatMap(ac -> ac.deleteAcls(List.of(aclBinding)))
.doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString));
}
public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter filter) {
return adminClientService.get(cluster)
.flatMap(c -> c.listAcls(filter))
.flatMapIterable(acls -> acls);
}
public Mono<String> getAclAsCsvString(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(c -> c.listAcls(ResourcePatternFilter.ANY))
.map(AclCsv::transformToCsvString);
}
public Mono<Void> syncAclWithAclCsv(KafkaCluster cluster, String csv) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> {
var existingSet = Set.copyOf(existingAclList);
var newAcls = Set.copyOf(AclCsv.parseCsv(csv));
var toDelete = Sets.difference(existingSet, newAcls);
var toAdd = Sets.difference(newAcls, existingSet);
logAclSyncPlan(cluster, toAdd, toDelete);
if (toAdd.isEmpty() && toDelete.isEmpty()) {
return Mono.empty();
}
log.info("Starting new ACLs creation");
return ac.createAcls(toAdd)
.doOnSuccess(v -> {
log.info("{} new ACLs created", toAdd.size());
log.info("Starting ACLs deletion");
})
.then(ac.deleteAcls(toDelete)
.doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size())));
}));
}
private void logAclSyncPlan(KafkaCluster cluster, Set<AclBinding> toBeAdded, Set<AclBinding> toBeDeleted) {
log.info("'{}' cluster ACL sync plan: ", cluster.getName());
if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) {
log.info("Nothing to do, ACL is already in sync");
return;
}
if (!toBeAdded.isEmpty()) {
log.info("ACLs to be added ({}): ", toBeAdded.size());
for (AclBinding aclBinding : toBeAdded) {
log.info(" " + AclCsv.createAclString(aclBinding));
}
}
if (!toBeDeleted.isEmpty()) {
log.info("ACLs to be deleted ({}): ", toBeDeleted.size());
for (AclBinding aclBinding : toBeDeleted) {
log.info(" " + AclCsv.createAclString(aclBinding));
}
}
}
}

View file

@ -43,8 +43,7 @@ class TopicAnalysisStats {
Long max;
final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build();
void apply(byte[] bytes) {
int len = bytes.length;
void apply(int len) {
sum += len;
min = minNullable(min, len);
max = maxNullable(max, len);
@ -98,7 +97,7 @@ class TopicAnalysisStats {
if (rec.key() != null) {
byte[] keyBytes = rec.key().get();
keysSize.apply(keyBytes);
keysSize.apply(rec.serializedKeySize());
uniqKeys.update(keyBytes);
} else {
nullKeys++;
@ -106,7 +105,7 @@ class TopicAnalysisStats {
if (rec.value() != null) {
byte[] valueBytes = rec.value().get();
valuesSize.apply(valueBytes);
valuesSize.apply(rec.serializedValueSize());
uniqValues.update(valueBytes);
} else {
nullValues++;

View file

@ -44,7 +44,7 @@ public class DataMasking {
public static DataMasking create(@Nullable List<ClustersProperties.Masking> config) {
return new DataMasking(
Optional.ofNullable(config).orElse(List.of()).stream().map(property -> {
Preconditions.checkNotNull(property.getType(), "masking type not specifed");
Preconditions.checkNotNull(property.getType(), "masking type not specified");
Preconditions.checkArgument(
StringUtils.isNotEmpty(property.getTopicKeysPattern())
|| StringUtils.isNotEmpty(property.getTopicValuesPattern()),

View file

@ -0,0 +1,28 @@
package com.provectus.kafka.ui.service.masking.policies;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.regex.Pattern;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
interface FieldsSelector {
static FieldsSelector create(ClustersProperties.Masking property) {
if (StringUtils.hasText(property.getFieldsNamePattern()) && !CollectionUtils.isEmpty(property.getFields())) {
throw new ValidationException("You can't provide both fieldNames & fieldsNamePattern for masking");
}
if (StringUtils.hasText(property.getFieldsNamePattern())) {
Pattern pattern = Pattern.compile(property.getFieldsNamePattern());
return f -> pattern.matcher(f).matches();
}
if (!CollectionUtils.isEmpty(property.getFields())) {
return f -> property.getFields().contains(f);
}
//no pattern, no field names - mean all fields should be masked
return fieldName -> true;
}
boolean shouldBeMasked(String fieldName);
}

View file

@ -15,8 +15,8 @@ class Mask extends MaskingPolicy {
private final UnaryOperator<String> masker;
Mask(List<String> fieldNames, List<String> maskingChars) {
super(fieldNames);
Mask(FieldsSelector fieldsSelector, List<String> maskingChars) {
super(fieldsSelector);
this.masker = createMasker(maskingChars);
}
@ -38,22 +38,13 @@ class Mask extends MaskingPolicy {
for (int i = 0; i < input.length(); i++) {
int cp = input.codePointAt(i);
switch (Character.getType(cp)) {
case Character.SPACE_SEPARATOR:
case Character.LINE_SEPARATOR:
case Character.PARAGRAPH_SEPARATOR:
sb.appendCodePoint(cp); // keeping separators as-is
break;
case Character.UPPERCASE_LETTER:
sb.append(maskingChars.get(0));
break;
case Character.LOWERCASE_LETTER:
sb.append(maskingChars.get(1));
break;
case Character.DECIMAL_DIGIT_NUMBER:
sb.append(maskingChars.get(2));
break;
default:
sb.append(maskingChars.get(3));
case Character.SPACE_SEPARATOR,
Character.LINE_SEPARATOR,
Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keeping separators as-is
case Character.UPPERCASE_LETTER -> sb.append(maskingChars.get(0));
case Character.LOWERCASE_LETTER -> sb.append(maskingChars.get(1));
case Character.DECIMAL_DIGIT_NUMBER -> sb.append(maskingChars.get(2));
default -> sb.append(maskingChars.get(3));
}
}
return sb.toString();

View file

@ -2,46 +2,36 @@ package com.provectus.kafka.ui.service.masking.policies;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.provectus.kafka.ui.config.ClustersProperties;
import java.util.List;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public abstract class MaskingPolicy {
public static MaskingPolicy create(ClustersProperties.Masking property) {
List<String> fields = property.getFields() == null
? List.of() // empty list means that policy will be applied to all fields
: property.getFields();
switch (property.getType()) {
case REMOVE:
return new Remove(fields);
case REPLACE:
return new Replace(
fields,
property.getReplacement() == null
? Replace.DEFAULT_REPLACEMENT
: property.getReplacement()
);
case MASK:
return new Mask(
fields,
property.getPattern() == null
? Mask.DEFAULT_PATTERN
: property.getPattern()
);
default:
throw new IllegalStateException("Unknown policy type: " + property.getType());
}
FieldsSelector fieldsSelector = FieldsSelector.create(property);
return switch (property.getType()) {
case REMOVE -> new Remove(fieldsSelector);
case REPLACE -> new Replace(
fieldsSelector,
property.getReplacement() == null
? Replace.DEFAULT_REPLACEMENT
: property.getReplacement()
);
case MASK -> new Mask(
fieldsSelector,
property.getMaskingCharsReplacement() == null
? Mask.DEFAULT_PATTERN
: property.getMaskingCharsReplacement()
);
};
}
//----------------------------------------------------------------
// empty list means policy will be applied to all fields
private final List<String> fieldNames;
private final FieldsSelector fieldsSelector;
protected boolean fieldShouldBeMasked(String fieldName) {
return fieldNames.isEmpty() || fieldNames.contains(fieldName);
return fieldsSelector.shouldBeMasked(fieldName);
}
public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node);

View file

@ -4,12 +4,12 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.List;
class Remove extends MaskingPolicy {
Remove(List<String> fieldNames) {
super(fieldNames);
Remove(FieldsSelector fieldsSelector) {
super(fieldsSelector);
}
@Override

View file

@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions;
import java.util.List;
class Replace extends MaskingPolicy {
@ -14,8 +13,8 @@ class Replace extends MaskingPolicy {
private final String replacement;
Replace(List<String> fieldNames, String replacementString) {
super(fieldNames);
Replace(FieldsSelector fieldsSelector, String replacementString) {
super(fieldsSelector);
this.replacement = Preconditions.checkNotNull(replacementString);
}

View file

@ -61,7 +61,9 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
} catch (Exception e) {
log.error("----------------------------------");
log.error("SSL can't be enabled for JMX retrieval. "
+ "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg.", e);
+ "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
e.getMessage());
log.trace("SSL can't be enabled for JMX retrieval", e);
log.error("----------------------------------");
}
SSL_JMX_SUPPORTED = sslJmxSupported;

View file

@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Permission;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.Subject;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
@ -19,11 +20,12 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.rbac.extractor.CognitoAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.LdapAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.OauthAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.ProviderAuthorityExtractor;
import jakarta.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@ -34,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.core.env.Environment;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext;
@ -50,10 +53,11 @@ public class AccessControlService {
@Nullable
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
private final RoleBasedAccessControlProperties properties;
private final Environment environment;
private boolean rbacEnabled = false;
private Set<ProviderAuthorityExtractor> extractors = Collections.emptySet();
private final RoleBasedAccessControlProperties properties;
private Set<ProviderAuthorityExtractor> oauthExtractors = Collections.emptySet();
@PostConstruct
public void init() {
@ -63,21 +67,27 @@ public class AccessControlService {
}
rbacEnabled = true;
this.extractors = properties.getRoles()
this.oauthExtractors = properties.getRoles()
.stream()
.map(role -> role.getSubjects()
.stream()
.map(provider -> switch (provider.getProvider()) {
.map(Subject::getProvider)
.distinct()
.map(provider -> switch (provider) {
case OAUTH_COGNITO -> new CognitoAuthorityExtractor();
case OAUTH_GOOGLE -> new GoogleAuthorityExtractor();
case OAUTH_GITHUB -> new GithubAuthorityExtractor();
case LDAP, LDAP_AD -> new LdapAuthorityExtractor();
}).collect(Collectors.toSet()))
case OAUTH -> new OauthAuthorityExtractor();
default -> null;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet()))
.flatMap(Set::stream)
.collect(Collectors.toSet());
if ((clientRegistrationRepository == null || !clientRegistrationRepository.iterator().hasNext())
&& !properties.getRoles().isEmpty()) {
if (!properties.getRoles().isEmpty()
&& "oauth2".equalsIgnoreCase(environment.getProperty("auth.type"))
&& (clientRegistrationRepository == null || !clientRegistrationRepository.iterator().hasNext())) {
log.error("Roles are configured but no authentication methods are present. Authentication might fail.");
}
}
@ -354,8 +364,8 @@ public class AccessControlService {
return isAccessible(Resource.KSQL, null, user, context, requiredActions);
}
public Set<ProviderAuthorityExtractor> getExtractors() {
return extractors;
public Set<ProviderAuthorityExtractor> getOauthExtractors() {
return oauthExtractors;
}
public List<Role> getRoles() {

View file

@ -1,5 +1,8 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import static com.provectus.kafka.ui.model.rbac.provider.Provider.Name.COGNITO;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
@ -18,8 +21,8 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
private static final String COGNITO_GROUPS_ATTRIBUTE_NAME = "cognito:groups";
@Override
public boolean isApplicable(String provider) {
return Provider.Name.COGNITO.equalsIgnoreCase(provider);
public boolean isApplicable(String provider, Map<String, String> customParams) {
return COGNITO.equalsIgnoreCase(provider) || COGNITO.equalsIgnoreCase(customParams.get(TYPE));
}
@Override
@ -63,7 +66,7 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
.map(Role::getName)
.collect(Collectors.toSet());
return Mono.just(Stream.concat(groupsByUsername.stream(), groupsByGroups.stream()).collect(Collectors.toSet()));
return Mono.just(Sets.union(groupsByUsername, groupsByGroups));
}
}

View file

@ -1,5 +1,7 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import static com.provectus.kafka.ui.model.rbac.provider.Provider.Name.GITHUB;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
@ -28,8 +30,8 @@ public class GithubAuthorityExtractor implements ProviderAuthorityExtractor {
private static final String DUMMY = "dummy";
@Override
public boolean isApplicable(String provider) {
return Provider.Name.GITHUB.equalsIgnoreCase(provider);
public boolean isApplicable(String provider, Map<String, String> customParams) {
return GITHUB.equalsIgnoreCase(provider) || GITHUB.equalsIgnoreCase(customParams.get(TYPE));
}
@Override

View file

@ -1,13 +1,14 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import static com.provectus.kafka.ui.model.rbac.provider.Provider.Name.GOOGLE;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.oauth2.core.user.DefaultOAuth2User;
import reactor.core.publisher.Mono;
@ -19,8 +20,8 @@ public class GoogleAuthorityExtractor implements ProviderAuthorityExtractor {
public static final String EMAIL_ATTRIBUTE_NAME = "email";
@Override
public boolean isApplicable(String provider) {
return Provider.Name.GOOGLE.equalsIgnoreCase(provider);
public boolean isApplicable(String provider, Map<String, String> customParams) {
return GOOGLE.equalsIgnoreCase(provider) || GOOGLE.equalsIgnoreCase(customParams.get(TYPE));
}
@Override
@ -52,7 +53,7 @@ public class GoogleAuthorityExtractor implements ProviderAuthorityExtractor {
return Mono.just(groupsByUsername);
}
List<String> groupsByDomain = acs.getRoles()
Set<String> groupsByDomain = acs.getRoles()
.stream()
.filter(r -> r.getSubjects()
.stream()
@ -60,10 +61,9 @@ public class GoogleAuthorityExtractor implements ProviderAuthorityExtractor {
.filter(s -> s.getType().equals("domain"))
.anyMatch(s -> s.getValue().equals(domain)))
.map(Role::getName)
.toList();
.collect(Collectors.toSet());
return Mono.just(Stream.concat(groupsByUsername.stream(), groupsByDomain.stream())
.collect(Collectors.toSet()));
return Mono.just(Sets.union(groupsByUsername, groupsByDomain));
}
}

View file

@ -1,23 +0,0 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class LdapAuthorityExtractor implements ProviderAuthorityExtractor {
@Override
public boolean isApplicable(String provider) {
return false; // TODO #2752
}
@Override
public Mono<Set<String>> extract(AccessControlService acs, Object value, Map<String, Object> additionalParams) {
return Mono.just(Collections.emptySet()); // TODO #2752
}
}

View file

@ -1,22 +1,44 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import static com.provectus.kafka.ui.model.rbac.provider.Provider.Name.OAUTH;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.config.auth.OAuthProperties;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.oauth2.core.user.DefaultOAuth2User;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
@Slf4j
public class OauthAuthorityExtractor implements ProviderAuthorityExtractor {
public static final String ROLES_FIELD_PARAM_NAME = "roles-field";
@Override
public boolean isApplicable(String provider) {
return false; // TODO #2844
public boolean isApplicable(String provider, Map<String, String> customParams) {
var containsRolesFieldNameParam = customParams.containsKey(ROLES_FIELD_PARAM_NAME);
if (!containsRolesFieldNameParam) {
log.debug("Provider [{}] doesn't contain a roles field param name, mapping won't be performed", provider);
return false;
}
return OAUTH.equalsIgnoreCase(provider) || OAUTH.equalsIgnoreCase(customParams.get(TYPE));
}
@Override
public Mono<Set<String>> extract(AccessControlService acs, Object value, Map<String, Object> additionalParams) {
log.trace("Extracting OAuth2 user authorities");
DefaultOAuth2User principal;
try {
principal = (DefaultOAuth2User) value;
@ -25,7 +47,67 @@ public class OauthAuthorityExtractor implements ProviderAuthorityExtractor {
throw new RuntimeException();
}
return Mono.just(Set.of(principal.getName())); // TODO #2844
var provider = (OAuthProperties.OAuth2Provider) additionalParams.get("provider");
Assert.notNull(provider, "provider is null");
var rolesFieldName = provider.getCustomParams().get(ROLES_FIELD_PARAM_NAME);
Set<String> rolesByUsername = acs.getRoles()
.stream()
.filter(r -> r.getSubjects()
.stream()
.filter(s -> s.getProvider().equals(Provider.OAUTH))
.filter(s -> s.getType().equals("user"))
.anyMatch(s -> s.getValue().equals(principal.getName())))
.map(Role::getName)
.collect(Collectors.toSet());
Set<String> rolesByRolesField = acs.getRoles()
.stream()
.filter(role -> role.getSubjects()
.stream()
.filter(s -> s.getProvider().equals(Provider.OAUTH))
.filter(s -> s.getType().equals("role"))
.anyMatch(subject -> {
var roleName = subject.getValue();
var principalRoles = convertRoles(principal.getAttribute(rolesFieldName));
var roleMatched = principalRoles.contains(roleName);
if (roleMatched) {
log.debug("Assigning role [{}] to user [{}]", roleName, principal.getName());
} else {
log.trace("Role [{}] not found in user [{}] roles", roleName, principal.getName());
}
return roleMatched;
})
)
.map(Role::getName)
.collect(Collectors.toSet());
return Mono.just(Sets.union(rolesByUsername, rolesByRolesField));
}
@SuppressWarnings("unchecked")
private Collection<String> convertRoles(Object roles) {
if (roles == null) {
log.debug("Param missing from attributes, skipping");
return Collections.emptySet();
}
if ((roles instanceof List<?>) || (roles instanceof Set<?>)) {
log.trace("The field is either a set or a list, returning as is");
return (Collection<String>) roles;
}
if (!(roles instanceof String)) {
log.debug("The field is not a string, skipping");
return Collections.emptySet();
}
log.trace("Trying to deserialize the field value [{}] as a string", roles);
return Arrays.stream(((String) roles).split(","))
.collect(Collectors.toSet());
}
}

View file

@ -7,7 +7,9 @@ import reactor.core.publisher.Mono;
public interface ProviderAuthorityExtractor {
boolean isApplicable(String provider);
String TYPE = "type";
boolean isApplicable(String provider, Map<String, String> customParams);
Mono<Set<String>> extract(AccessControlService acs, Object value, Map<String, Object> additionalParams);

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.config.auth.OAuthProperties;
import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
import com.provectus.kafka.ui.exception.FileUploadException;
@ -89,6 +90,7 @@ public class DynamicConfigOperations {
}
public PropertiesStructure getCurrentProperties() {
checkIfDynamicConfigEnabled();
return PropertiesStructure.builder()
.kafka(getNullableBean(ClustersProperties.class))
.rbac(getNullableBean(RoleBasedAccessControlProperties.class))
@ -97,6 +99,7 @@ public class DynamicConfigOperations {
.type(ctx.getEnvironment().getProperty("auth.type"))
.oauth2(getNullableBean(OAuthProperties.class))
.build())
.webclient(getNullableBean(WebclientProperties.class))
.build();
}
@ -110,11 +113,7 @@ public class DynamicConfigOperations {
}
public void persist(PropertiesStructure properties) {
if (!dynamicConfigEnabled()) {
throw new ValidationException(
"Dynamic config change is not allowed. "
+ "Set dynamic.config.enabled property to 'true' to enabled it.");
}
checkIfDynamicConfigEnabled();
properties.initAndValidate();
String yaml = serializeToYaml(properties);
@ -122,8 +121,9 @@ public class DynamicConfigOperations {
}
public Mono<Path> uploadConfigRelatedFile(FilePart file) {
String targetDirStr = (String) ctx.getEnvironment().getSystemEnvironment()
.getOrDefault(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT);
checkIfDynamicConfigEnabled();
String targetDirStr = ctx.getEnvironment()
.getProperty(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT);
Path targetDir = Path.of(targetDirStr);
if (!Files.exists(targetDir)) {
@ -147,6 +147,14 @@ public class DynamicConfigOperations {
.onErrorMap(th -> new FileUploadException(targetFilePath, th));
}
private void checkIfDynamicConfigEnabled() {
if (!dynamicConfigEnabled()) {
throw new ValidationException(
"Dynamic config change is not allowed. "
+ "Set dynamic.config.enabled property to 'true' to enabled it.");
}
}
@SneakyThrows
private void writeYamlToFile(String yaml, Path path) {
if (Files.isDirectory(path)) {
@ -204,6 +212,7 @@ public class DynamicConfigOperations {
private ClustersProperties kafka;
private RoleBasedAccessControlProperties rbac;
private Auth auth;
private WebclientProperties webclient;
@Data
@Builder
@ -221,7 +230,10 @@ public class DynamicConfigOperations {
Optional.ofNullable(auth)
.flatMap(a -> Optional.ofNullable(a.oauth2))
.ifPresent(OAuthProperties::validate);
.ifPresent(OAuthProperties::init);
Optional.ofNullable(webclient)
.ifPresent(WebclientProperties::validate);
}
}

View file

@ -1,24 +1,21 @@
package com.provectus.kafka.ui.util;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@Slf4j
public final class KafkaVersion {
private KafkaVersion() {
}
public static float parse(String version) throws NumberFormatException {
log.trace("Parsing cluster version [{}]", version);
public static Optional<Float> parse(String version) throws NumberFormatException {
try {
final String[] parts = version.split("\\.");
if (parts.length > 2) {
version = parts[0] + "." + parts[1];
}
return Float.parseFloat(version.split("-")[0]);
return Optional.of(Float.parseFloat(version.split("-")[0]));
} catch (Exception e) {
log.error("Conversion clusterVersion [{}] to float value failed", version, e);
throw e;
return Optional.empty();
}
}
}

View file

@ -4,9 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
// Specifies field that can contain any kind of value - primitive, complex and nulls
public class AnyFieldSchema implements FieldSchema {
class AnyFieldSchema implements FieldSchema {
public static AnyFieldSchema get() {
static AnyFieldSchema get() {
return new AnyFieldSchema();
}

View file

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class ArrayFieldSchema implements FieldSchema {
class ArrayFieldSchema implements FieldSchema {
private final FieldSchema itemsSchema;
public ArrayFieldSchema(FieldSchema itemsSchema) {
ArrayFieldSchema(FieldSchema itemsSchema) {
this.itemsSchema = itemsSchema;
}

View file

@ -7,10 +7,10 @@ import java.util.List;
import java.util.Map;
public class EnumJsonType extends JsonType {
class EnumJsonType extends JsonType {
private final List<String> values;
public EnumJsonType(List<String> values) {
EnumJsonType(List<String> values) {
super(Type.ENUM);
this.values = values;
}

View file

@ -3,6 +3,6 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public interface FieldSchema {
interface FieldSchema {
JsonNode toJsonNode(ObjectMapper mapper);
}

View file

@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public abstract class JsonType {
abstract class JsonType {
protected final Type type;
@ -12,13 +12,13 @@ public abstract class JsonType {
this.type = type;
}
public Type getType() {
Type getType() {
return type;
}
public abstract Map<String, JsonNode> toJsonNode(ObjectMapper mapper);
abstract Map<String, JsonNode> toJsonNode(ObjectMapper mapper);
public enum Type {
enum Type {
NULL,
BOOLEAN,
OBJECT,

View file

@ -2,21 +2,27 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import javax.annotation.Nullable;
public class MapFieldSchema implements FieldSchema {
private final FieldSchema itemSchema;
class MapFieldSchema implements FieldSchema {
private final @Nullable FieldSchema itemSchema;
public MapFieldSchema(FieldSchema itemSchema) {
MapFieldSchema(@Nullable FieldSchema itemSchema) {
this.itemSchema = itemSchema;
}
MapFieldSchema() {
this(null);
}
@Override
public JsonNode toJsonNode(ObjectMapper mapper) {
final ObjectNode objectNode = mapper.createObjectNode();
objectNode.set("type", new TextNode(JsonType.Type.OBJECT.getName()));
objectNode.set("additionalProperties", itemSchema.toJsonNode(mapper));
objectNode.set("additionalProperties", itemSchema != null ? itemSchema.toJsonNode(mapper) : BooleanNode.TRUE);
return objectNode;
}
}

View file

@ -9,24 +9,24 @@ import java.util.stream.Collectors;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class ObjectFieldSchema implements FieldSchema {
class ObjectFieldSchema implements FieldSchema {
public static final ObjectFieldSchema EMPTY = new ObjectFieldSchema(Map.of(), List.of());
static final ObjectFieldSchema EMPTY = new ObjectFieldSchema(Map.of(), List.of());
private final Map<String, FieldSchema> properties;
private final List<String> required;
public ObjectFieldSchema(Map<String, FieldSchema> properties,
ObjectFieldSchema(Map<String, FieldSchema> properties,
List<String> required) {
this.properties = properties;
this.required = required;
}
public Map<String, FieldSchema> getProperties() {
Map<String, FieldSchema> getProperties() {
return properties;
}
public List<String> getRequired() {
List<String> getRequired() {
return required;
}

View file

@ -5,11 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.stream.Collectors;
public class OneOfFieldSchema implements FieldSchema {
class OneOfFieldSchema implements FieldSchema {
private final List<FieldSchema> schemaList;
public OneOfFieldSchema(
List<FieldSchema> schemaList) {
OneOfFieldSchema(List<FieldSchema> schemaList) {
this.schemaList = schemaList;
}

View file

@ -94,6 +94,9 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
if (wellKnownTypeSchema.isPresent()) {
return wellKnownTypeSchema.get();
}
if (field.isMapField()) {
return new MapFieldSchema();
}
final JsonType jsonType = convertType(field);
FieldSchema fieldSchema;
if (jsonType.getType().equals(JsonType.Type.OBJECT)) {
@ -149,67 +152,47 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
}
private JsonType convertType(Descriptors.FieldDescriptor field) {
switch (field.getType()) {
case INT32:
case FIXED32:
case SFIXED32:
case SINT32:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", IntNode.valueOf(Integer.MAX_VALUE),
"minimum", IntNode.valueOf(Integer.MIN_VALUE)
)
);
case UINT32:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
"minimum", IntNode.valueOf(0)
)
);
return switch (field.getType()) {
case INT32, FIXED32, SFIXED32, SINT32 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", IntNode.valueOf(Integer.MAX_VALUE),
"minimum", IntNode.valueOf(Integer.MIN_VALUE)
)
);
case UINT32 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
"minimum", IntNode.valueOf(0)
)
);
//TODO: actually all *64 types will be printed with quotes (as strings),
// see JsonFormat::printSingleFieldValue for impl. This can cause problems when you copy-paste from messages
// table to `Produce` area - need to think if it is critical or not.
case INT64:
case FIXED64:
case SFIXED64:
case SINT64:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(Long.MAX_VALUE),
"minimum", LongNode.valueOf(Long.MIN_VALUE)
)
);
case UINT64:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
"minimum", LongNode.valueOf(0)
)
);
case MESSAGE:
case GROUP:
return new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM:
return new EnumJsonType(
field.getEnumType().getValues().stream()
.map(Descriptors.EnumValueDescriptor::getName)
.collect(Collectors.toList())
);
case BYTES:
case STRING:
return new SimpleJsonType(JsonType.Type.STRING);
case FLOAT:
case DOUBLE:
return new SimpleJsonType(JsonType.Type.NUMBER);
case BOOL:
return new SimpleJsonType(JsonType.Type.BOOLEAN);
default:
return new SimpleJsonType(JsonType.Type.STRING);
}
case INT64, FIXED64, SFIXED64, SINT64 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(Long.MAX_VALUE),
"minimum", LongNode.valueOf(Long.MIN_VALUE)
)
);
case UINT64 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
"minimum", LongNode.valueOf(0)
)
);
case MESSAGE, GROUP -> new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM -> new EnumJsonType(
field.getEnumType().getValues().stream()
.map(Descriptors.EnumValueDescriptor::getName)
.collect(Collectors.toList())
);
case BYTES, STRING -> new SimpleJsonType(JsonType.Type.STRING);
case FLOAT, DOUBLE -> new SimpleJsonType(JsonType.Type.NUMBER);
case BOOL -> new SimpleJsonType(JsonType.Type.BOOLEAN);
};
}
}

View file

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.TextNode;
public class RefFieldSchema implements FieldSchema {
class RefFieldSchema implements FieldSchema {
private final String ref;
public RefFieldSchema(String ref) {
RefFieldSchema(String ref) {
this.ref = ref;
}
@ -16,7 +16,7 @@ public class RefFieldSchema implements FieldSchema {
return mapper.createObjectNode().set("$ref", new TextNode(ref));
}
public String getRef() {
String getRef() {
return ref;
}
}

View file

@ -3,10 +3,10 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SimpleFieldSchema implements FieldSchema {
class SimpleFieldSchema implements FieldSchema {
private final JsonType type;
public SimpleFieldSchema(JsonType type) {
SimpleFieldSchema(JsonType type) {
this.type = type;
}

View file

@ -6,15 +6,15 @@ import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class SimpleJsonType extends JsonType {
class SimpleJsonType extends JsonType {
private final Map<String, JsonNode> additionalTypeProperties;
public SimpleJsonType(Type type) {
SimpleJsonType(Type type) {
this(type, Map.of());
}
public SimpleJsonType(Type type, Map<String, JsonNode> additionalTypeProperties) {
SimpleJsonType(Type type, Map<String, JsonNode> additionalTypeProperties) {
super(type);
this.additionalTypeProperties = additionalTypeProperties;
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
@ -9,6 +10,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
@ -47,6 +49,9 @@ public abstract class AbstractIntegrationTest {
.dependsOn(kafka)
.dependsOn(schemaRegistry);
@TempDir
public static Path tmpDir;
static {
kafka.start();
schemaRegistry.start();
@ -76,6 +81,9 @@ public abstract class AbstractIntegrationTest {
System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getUrl());
System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect");
System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
System.setProperty("dynamic.config.enabled", "true");
System.setProperty("config.related.uploads.dir", tmpDir.toString());
}
}

View file

@ -0,0 +1,49 @@
package com.provectus.kafka.ui.controller;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
import java.io.IOException;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.util.MultiValueMap;
class ApplicationConfigControllerTest extends AbstractIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void testUpload() throws IOException {
var fileToUpload = new ClassPathResource("/fileForUploadTest.txt", this.getClass());
UploadedFileInfoDTO result = webTestClient
.post()
.uri("/api/config/relatedfiles")
.bodyValue(generateBody(fileToUpload))
.exchange()
.expectStatus()
.isOk()
.expectBody(UploadedFileInfoDTO.class)
.returnResult()
.getResponseBody();
assertThat(result).isNotNull();
assertThat(result.getLocation()).isNotNull();
assertThat(Path.of(result.getLocation()))
.hasSameBinaryContentAs(fileToUpload.getFile().toPath());
}
private MultiValueMap<String, HttpEntity<?>> generateBody(ClassPathResource resource) {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", resource);
return builder.build();
}
}

Some files were not shown because too many files have changed in this diff Show more