Merge branch 'master' into issues/clearResultsFunctionalCheck4

# Conflicts:
#	kafka-ui-e2e-checks/src/main/java/com/provectus/kafka/ui/pages/ksqldb/KsqlQueryForm.java
#	kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java
#	kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/ksqldb/KsqlDbTest.java
This commit is contained in:
VladSenyuta 2023-05-08 10:08:56 +03:00
commit e444e85eb3
133 changed files with 2538 additions and 965 deletions

92
.github/ISSUE_TEMPLATE/bug.yml vendored Normal file
View file

@ -0,0 +1,92 @@
name: "\U0001F41E Bug report"
description: File a bug report
labels: ["status/triage", "type/bug"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version, if you use one
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,64 +0,0 @@
---
name: "\U0001F41E Bug report"
about: Create a bug report
title: ''
labels: status/triage, type/bug
assignees: ''
---
<!--
We will close the issue without further explanation if you don't follow this template and don't provide the information requested within this template.
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
<!--
Please follow the naming conventions for bugs:
<Feature/Area/Scope> : <Compact, but specific problem summary>
Avoid generic titles, like “Topics: incorrect layout of message sorting drop-down list”. Better use something like: “Topics: Message sorting drop-down list overlaps the "Submit" button”.
-->
**Describe the bug** (Actual behavior)
<!--(A clear and concise description of what the bug is.Use a list, if there is more than one problem)-->
**Expected behavior**
<!--(A clear and concise description of what you expected to happen.)-->
**Set up**
<!--
WE MIGHT CLOSE THE ISSUE without further explanation IF YOU DON'T PROVIDE THIS INFORMATION.
How do you run the app? Please provide as much info as possible:
1. App version (docker image version or check commit hash in the top left corner in UI)
2. Helm chart version, if you use one
3. Any IAAC configs
-->
**Steps to Reproduce**
<!-- We'd like you to provide an example setup (via docker-compose, helm, etc.)
to reproduce the problem, especially with a complex setups. -->
1.
**Screenshots**
<!--
(If applicable, add screenshots to help explain your problem)
-->
**Additional context**
<!--
Add any other context about the problem here. E.g.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successfull or same issue occured? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
-->

11
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View file

@ -0,0 +1,11 @@
blank_issues_enabled: false
contact_links:
- name: Official documentation
url: https://docs.kafka-ui.provectus.io/
about: Before reaching out for support, please refer to our documentation. Read "FAQ" and "Common problems", also try using search there.
- name: Community Discord
url: https://discord.gg/4DWzD7pGE5
about: Chat with other users, get some support or ask questions.
- name: GitHub Discussions
url: https://github.com/provectus/kafka-ui/discussions
about: An alternative place to ask questions or to get some support.

66
.github/ISSUE_TEMPLATE/feature.yml vendored Normal file
View file

@ -0,0 +1,66 @@
name: "\U0001F680 Feature request"
description: Propose a new feature
labels: ["status/triage", "type/feature"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md) and the feature is not present there
required: true
- type: textarea
attributes:
label: Is your proposal related to a problem?
description: |
Provide a clear and concise description of what the problem is.
For example, "I'm always frustrated when..."
validations:
required: false
- type: textarea
attributes:
label: Describe the feature you're interested in
description: |
Provide a clear and concise description of what you want to happen.
validations:
required: true
- type: textarea
attributes:
label: Describe alternatives you've considered
description: |
Let us know about other solutions you've tried or researched.
validations:
required: false
- type: input
attributes:
label: Version you're running
description: |
Please provide the app version you're currently running:
1. App version (commit hash in the top left corner of the UI)
validations:
required: true
- type: textarea
attributes:
label: Additional context
description: |
Is there anything else you can add about the proposal?
You might want to link to related issues here, if you haven't already.
validations:
required: false

View file

@ -1,46 +0,0 @@
---
name: "\U0001F680 Feature request"
about: Propose a new feature
title: ''
labels: status/triage, type/feature
assignees: ''
---
<!--
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
### Which version of the app are you running?
<!-- Please provide docker image version or check commit hash in the top left corner in UI) -->
### Is your proposal related to a problem?
<!--
Provide a clear and concise description of what the problem is.
For example, "I'm always frustrated when..."
-->
### Describe the solution you'd like
<!--
Provide a clear and concise description of what you want to happen.
-->
### Describe alternatives you've considered
<!--
Let us know about other solutions you've tried or researched.
-->
### Additional context
<!--
Is there anything else you can add about the proposal?
You might want to link to related issues here, if you haven't already.
-->

92
.github/ISSUE_TEMPLATE/helm.yml vendored Normal file
View file

@ -0,0 +1,92 @@
name: "⎈ K8s/Helm problem report"
description: "Report a problem with k8s/helm charts/etc"
labels: ["status/triage", "scope/k8s"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,52 +0,0 @@
---
name: "⎈ K8s/Helm problem report"
about: Report a problem with k8s/helm charts/etc
title: ''
labels: scope/k8s, status/triage
assignees: azatsafin
---
<!--
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
**Describe the bug**
<!--(A clear and concise description of what the bug is.)-->
**Set up**
<!--
How do you run the app? Please provide as much info as possible:
1. App version (docker image version or check commit hash in the top left corner in UI)
2. Helm chart version, if you use one
3. Any IAAC configs
We might close the issue without further explanation if you don't provide such information.
-->
**Steps to Reproduce**
Steps to reproduce the behavior:
1.
**Expected behavior**
<!--
(A clear and concise description of what you expected to happen)
-->
**Screenshots**
<!--
(If applicable, add screenshots to help explain your problem)
-->
**Additional context**
<!--
(Add any other context about the problem here)
-->

View file

@ -1,16 +0,0 @@
---
name: "❓ Question"
about: Ask a question
title: ''
---
<!--
To ask a question, please either:
1. Open up a discussion (https://github.com/provectus/kafka-ui/discussions)
2. Join us on discord (https://discord.gg/4DWzD7pGE5) and ask there.
Don't forget to check/search for existing issues/discussions.
-->

View file

@ -8,8 +8,6 @@ updates:
timezone: Europe/Moscow
reviewers:
- "Haarolean"
assignees:
- "Haarolean"
labels:
- "scope/backend"
- "type/dependencies"
@ -99,8 +97,6 @@ updates:
timezone: Europe/Moscow
reviewers:
- "Haarolean"
assignees:
- "Haarolean"
labels:
- "scope/infrastructure"
- "type/dependencies"

View file

@ -6,7 +6,7 @@ jobs:
block_merge:
runs-on: ubuntu-latest
steps:
- uses: mheap/github-action-required-labels@v3
- uses: mheap/github-action-required-labels@v4
with:
mode: exactly
count: 0

View file

@ -86,7 +86,7 @@ jobs:
- name: make comment with private deployment link
if: ${{ github.event.label.name == 'status/feature_testing' }}
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
@ -94,7 +94,7 @@ jobs:
- name: make comment with public deployment link
if: ${{ github.event.label.name == 'status/feature_testing_public' }}
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -21,7 +21,7 @@ jobs:
git add ../kafka-ui-from-branch/
git commit -m "removed env:${{ needs.build.outputs.deploy }}" && git push || true
- name: make comment with deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -65,7 +65,7 @@ jobs:
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
- name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -55,7 +55,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache
- name: Run CVE checks
uses: aquasecurity/trivy-action@0.9.2
uses: aquasecurity/trivy-action@0.10.0
with:
image-ref: "provectuslabs/kafka-ui:${{ steps.build.outputs.version }}"
format: "table"

View file

@ -33,7 +33,7 @@ jobs:
--image-ids imageTag=${{ steps.extract_branch.outputs.tag }} \
--region us-east-1
- name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -7,7 +7,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v7
- uses: actions/stale@v8
with:
days-before-issue-stale: 7
days-before-issue-close: 3

View file

@ -6,8 +6,9 @@ Following versions of the project are currently being supported with security up
| Version | Supported |
| ------- | ------------------ |
| 0.5.x | :white_check_mark: |
| 0.4.x | :x: |
| 0.6.x | :white_check_mark: |
| 0.5.x | :x: |
| 0.4.x | :x: |
| 0.3.x | :x: |
| 0.2.x | :x: |
| 0.1.x | :x: |

View file

@ -2,6 +2,6 @@ apiVersion: v2
name: kafka-ui
description: A Helm chart for kafka-UI
type: application
version: 0.6.1
appVersion: v0.6.1
version: 0.6.2
appVersion: v0.6.2
icon: https://github.com/provectus/kafka-ui/raw/master/documentation/images/kafka-ui-logo.png

View file

@ -11,4 +11,8 @@ KafkaClient {
user_admin="admin-secret";
};
Client {};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="zkuser"
password="zkuserpassword";
};

View file

@ -0,0 +1,4 @@
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_zkuser="zkuserpassword";
};

View file

@ -0,0 +1,59 @@
---
version: '2'
services:
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- zookeeper
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_PLAINTEXT
KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: PLAIN
KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
zookeeper:
image: wurstmeister/zookeeper:3.4.6
environment:
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
volumes:
- ./jaas/zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
ports:
- 2181:2181
kafka:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server.conf"
KAFKA_AUTHORIZER_CLASS_NAME: "kafka.security.authorizer.AclAuthorizer"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: localhost
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'SASL_PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN'
KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT'
KAFKA_SUPER_USERS: 'User:admin'
volumes:
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./jaas:/etc/kafka/jaas

View file

@ -9,4 +9,6 @@ message MySpecificTopicValue {
message MyValue {
int32 version = 1;
string payload = 2;
map<int32, string> intToStringMap = 3;
map<string, MyValue> strToObjMap = 4;
}

View file

@ -12,7 +12,7 @@
<artifactId>kafka-ui-api</artifactId>
<properties>
<jacoco.version>0.8.8</jacoco.version>
<jacoco.version>0.8.10</jacoco.version>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<sonar.dynamicAnalysis>reuseReports</sonar.dynamicAnalysis>
<sonar.jacoco.reportPath>${project.basedir}/target/jacoco.exec</sonar.jacoco.reportPath>
@ -21,6 +21,12 @@
</properties>
<dependencies>
<dependency>
<!--TODO: remove, when spring-boot fixed dependency to 6.0.8+ (6.0.7 has CVE) -->
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>6.0.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
@ -109,6 +115,12 @@
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId>
</dependency>
<!-- https://github.com/provectus/kafka-ui/pull/3693 -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View file

@ -27,6 +27,8 @@ public class ClustersProperties {
String internalTopicPrefix;
Integer adminClientTimeout;
PollingProperties polling = new PollingProperties();
@Data
@ -56,6 +58,8 @@ public class ClustersProperties {
Integer pollTimeoutMs;
Integer partitionPollTimeout;
Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
}
@Data
@ -127,8 +131,9 @@ public class ClustersProperties {
@Data
public static class Masking {
Type type;
List<String> fields; //if null or empty list - policy will be applied to all fields
List<String> pattern; //used when type=MASK
List<String> fields;
String fieldsNamePattern;
List<String> maskingCharsReplacement; //used when type=MASK
String replacement; //used when type=REPLACE
String topicKeysPattern;
String topicValuesPattern;

View file

@ -5,7 +5,6 @@ import java.util.Map;
import lombok.AllArgsConstructor;
import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties;
import org.springframework.context.ApplicationContext;
@ -15,8 +14,6 @@ import org.springframework.http.server.reactive.ContextPathCompositeHandler;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@Configuration
@ -52,14 +49,7 @@ public class Config {
}
@Bean
public WebClient webClient(
@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
return WebClient.builder()
.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
.build();
}
@Bean
// will be used by webflux json mapping
public JsonNullableModule jsonNullableModule() {
return new JsonNullableModule();
}

View file

@ -0,0 +1,33 @@
package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.exception.ValidationException;
import java.beans.Transient;
import javax.annotation.PostConstruct;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.unit.DataSize;
@Configuration
@ConfigurationProperties("webclient")
@Data
public class WebclientProperties {
String maxInMemoryBufferSize;
@PostConstruct
public void validate() {
validateAndSetDefaultBufferSize();
}
private void validateAndSetDefaultBufferSize() {
if (maxInMemoryBufferSize != null) {
try {
DataSize.parse(maxInMemoryBufferSize);
} catch (Exception e) {
throw new ValidationException("Invalid format for webclient.maxInMemoryBufferSize");
}
}
}
}

View file

@ -0,0 +1,26 @@
package com.provectus.kafka.ui.config.auth;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("spring.ldap")
@Data
public class LdapProperties {
private String urls;
private String base;
private String adminUser;
private String adminPassword;
private String userFilterSearchBase;
private String userFilterSearchFilter;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;
@Value("${oauth2.ldap.aсtiveDirectory.domain:@null}")
private String activeDirectoryDomain;
@Value("${oauth2.ldap.groupRoleAttribute:cn}")
private String groupRoleAttribute;
}

View file

@ -1,13 +1,23 @@
package com.provectus.kafka.ui.config.auth;
import static com.provectus.kafka.ui.config.auth.AbstractAuthSecurityConfig.AUTH_WHITELIST;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.service.rbac.extractor.RbacLdapAuthoritiesExtractor;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.ldap.core.DirContextOperations;
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.ldap.core.support.LdapContextSource;
import org.springframework.security.authentication.AuthenticationManager;
@ -16,70 +26,71 @@ import org.springframework.security.authentication.ReactiveAuthenticationManager
import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.ldap.authentication.AbstractLdapAuthenticationProvider;
import org.springframework.security.ldap.authentication.BindAuthenticator;
import org.springframework.security.ldap.authentication.LdapAuthenticationProvider;
import org.springframework.security.ldap.authentication.ad.ActiveDirectoryLdapAuthenticationProvider;
import org.springframework.security.ldap.search.FilterBasedLdapUserSearch;
import org.springframework.security.ldap.search.LdapUserSearch;
import org.springframework.security.ldap.userdetails.LdapUserDetailsMapper;
import org.springframework.security.web.server.SecurityWebFilterChain;
@Configuration
@EnableWebFluxSecurity
@ConditionalOnProperty(value = "auth.type", havingValue = "LDAP")
@Import(LdapAutoConfiguration.class)
@EnableConfigurationProperties(LdapProperties.class)
@RequiredArgsConstructor
@Slf4j
public class LdapSecurityConfig extends AbstractAuthSecurityConfig {
public class LdapSecurityConfig {
@Value("${spring.ldap.urls}")
private String ldapUrls;
@Value("${spring.ldap.dn.pattern:#{null}}")
private String ldapUserDnPattern;
@Value("${spring.ldap.adminUser:#{null}}")
private String adminUser;
@Value("${spring.ldap.adminPassword:#{null}}")
private String adminPassword;
@Value("${spring.ldap.userFilter.searchBase:#{null}}")
private String userFilterSearchBase;
@Value("${spring.ldap.userFilter.searchFilter:#{null}}")
private String userFilterSearchFilter;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;
@Value("${oauth2.ldap.aсtiveDirectory.domain:#{null}}")
private String activeDirectoryDomain;
private final LdapProperties props;
@Bean
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource) {
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource,
ApplicationContext context,
@Nullable AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
BindAuthenticator ba = new BindAuthenticator(contextSource);
if (ldapUserDnPattern != null) {
ba.setUserDnPatterns(new String[] {ldapUserDnPattern});
if (props.getBase() != null) {
ba.setUserDnPatterns(new String[] {props.getBase()});
}
if (userFilterSearchFilter != null) {
if (props.getUserFilterSearchFilter() != null) {
LdapUserSearch userSearch =
new FilterBasedLdapUserSearch(userFilterSearchBase, userFilterSearchFilter, contextSource);
new FilterBasedLdapUserSearch(props.getUserFilterSearchBase(), props.getUserFilterSearchFilter(),
contextSource);
ba.setUserSearch(userSearch);
}
AbstractLdapAuthenticationProvider authenticationProvider;
if (!isActiveDirectory) {
authenticationProvider = new LdapAuthenticationProvider(ba);
if (!props.isActiveDirectory()) {
authenticationProvider = rbacEnabled
? new LdapAuthenticationProvider(ba, new RbacLdapAuthoritiesExtractor(context))
: new LdapAuthenticationProvider(ba);
} else {
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(activeDirectoryDomain, ldapUrls);
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(props.getActiveDirectoryDomain(),
props.getUrls()); // TODO Issue #3741
authenticationProvider.setUseAuthenticationRequestCredentials(true);
}
if (rbacEnabled) {
authenticationProvider.setUserDetailsContextMapper(new UserDetailsMapper());
}
AuthenticationManager am = new ProviderManager(List.of(authenticationProvider));
return new ReactiveAuthenticationManagerAdapter(am);
}
@Bean
@Primary
public BaseLdapPathContextSource contextSource() {
LdapContextSource ctx = new LdapContextSource();
ctx.setUrl(ldapUrls);
ctx.setUserDn(adminUser);
ctx.setPassword(adminPassword);
ctx.setUrl(props.getUrls());
ctx.setUserDn(props.getAdminUser());
ctx.setPassword(props.getAdminPassword());
ctx.afterPropertiesSet();
return ctx;
}
@ -87,20 +98,35 @@ public class LdapSecurityConfig extends AbstractAuthSecurityConfig {
@Bean
public SecurityWebFilterChain configureLdap(ServerHttpSecurity http) {
log.info("Configuring LDAP authentication.");
if (isActiveDirectory) {
if (props.isActiveDirectory()) {
log.info("Active Directory support for LDAP has been enabled.");
}
http
return http
.authorizeExchange()
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
.and()
.httpBasic();
return http.csrf().disable().build();
.and()
.formLogin()
.and()
.logout()
.and()
.csrf().disable()
.build();
}
private static class UserDetailsMapper extends LdapUserDetailsMapper {
@Override
public UserDetails mapUserFromContext(DirContextOperations ctx, String username,
Collection<? extends GrantedAuthority> authorities) {
UserDetails userDetails = super.mapUserFromContext(ctx, username, authorities);
return new RbacLdapUser(userDetails);
}
}
}

View file

@ -115,7 +115,7 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
@Nullable
private ProviderAuthorityExtractor getExtractor(final String providerId, AccessControlService acs) {
final String provider = getProviderByProviderId(providerId);
Optional<ProviderAuthorityExtractor> extractor = acs.getExtractors()
Optional<ProviderAuthorityExtractor> extractor = acs.getOauthExtractors()
.stream()
.filter(e -> e.isApplicable(provider))
.findFirst();

View file

@ -0,0 +1,60 @@
package com.provectus.kafka.ui.config.auth;
import java.util.Collection;
import java.util.stream.Collectors;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
public class RbacLdapUser implements UserDetails, RbacUser {
private final UserDetails userDetails;
public RbacLdapUser(UserDetails userDetails) {
this.userDetails = userDetails;
}
@Override
public String name() {
return userDetails.getUsername();
}
@Override
public Collection<String> groups() {
return userDetails.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
}
@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
return userDetails.getAuthorities();
}
@Override
public String getPassword() {
return userDetails.getPassword();
}
@Override
public String getUsername() {
return userDetails.getUsername();
}
@Override
public boolean isAccountNonExpired() {
return userDetails.isAccountNonExpired();
}
@Override
public boolean isAccountNonLocked() {
return userDetails.isAccountNonLocked();
}
@Override
public boolean isCredentialsNonExpired() {
return userDetails.isCredentialsNonExpired();
}
@Override
public boolean isEnabled() {
return userDetails.isEnabled();
}
}

View file

@ -0,0 +1,21 @@
package com.provectus.kafka.ui.config.auth.condition;
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public class ActiveDirectoryCondition extends AllNestedConditions {
public ActiveDirectoryCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@ConditionalOnProperty(value = "auth.type", havingValue = "LDAP")
public static class OnAuthType {
}
@ConditionalOnProperty(value = "${oauth2.ldap.activeDirectory}:false", havingValue = "true", matchIfMissing = false)
public static class OnActiveDirectory {
}
}

View file

@ -0,0 +1,115 @@
package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.AclsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.KafkaAclDTO;
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
public class AclsController extends AbstractController implements AclsApi {
private final AclsService aclsService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.build();
return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.build();
return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
KafkaAclResourceTypeDTO resourceTypeDto,
String resourceName,
KafkaAclNamePatternTypeDTO namePatternTypeDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.build();
var resourceType = Optional.ofNullable(resourceTypeDto)
.map(ClusterMapper::mapAclResourceTypeDto)
.orElse(ResourceType.ANY);
var namePatternType = Optional.ofNullable(namePatternTypeDto)
.map(ClusterMapper::mapPatternTypeDto)
.orElse(PatternType.ANY);
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
return accessControlService.validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
);
}
@Override
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.build();
return accessControlService.validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
);
}
@Override
public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> csvMono, ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.build();
return accessControlService.validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -27,6 +27,7 @@ import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers;
import org.springframework.http.ResponseEntity;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
@ -92,16 +93,19 @@ public class ApplicationConfigController implements ApplicationConfigApi {
}
@Override
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(FilePart file, ServerWebExchange exchange) {
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
.then(dynamicConfigOperations.uploadConfigRelatedFile(file))
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok);
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok));
}
@Override

View file

@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers;
@Slf4j
public class MessagesController extends AbstractController implements MessagesApi {
private static final int MAX_LOAD_RECORD_LIMIT = 100;
private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
private final MessagesService messagesService;
private final DeserializationService deserializationService;
private final AccessControlService accessControlService;
@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
var positions = new ConsumerPosition(
seekType,
@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde)
limit, seekDirection, keySerde, valueSerde)
)
);

View file

@ -20,6 +20,9 @@ import com.provectus.kafka.ui.model.InternalPartition;
import com.provectus.kafka.ui.model.InternalReplica;
import com.provectus.kafka.ui.model.InternalTopic;
import com.provectus.kafka.ui.model.InternalTopicConfig;
import com.provectus.kafka.ui.model.KafkaAclDTO;
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.MetricDTO;
import com.provectus.kafka.ui.model.Metrics;
import com.provectus.kafka.ui.model.PartitionDTO;
@ -27,12 +30,18 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.service.masking.DataMasking;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
@ -109,8 +118,74 @@ public interface ClusterMapper {
return brokerDiskUsage;
}
default DataMasking map(List<ClustersProperties.Masking> maskingProperties) {
return DataMasking.create(maskingProperties);
static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
return switch (operation) {
case ALL -> KafkaAclDTO.OperationEnum.ALL;
case READ -> KafkaAclDTO.OperationEnum.READ;
case WRITE -> KafkaAclDTO.OperationEnum.WRITE;
case CREATE -> KafkaAclDTO.OperationEnum.CREATE;
case DELETE -> KafkaAclDTO.OperationEnum.DELETE;
case ALTER -> KafkaAclDTO.OperationEnum.ALTER;
case DESCRIBE -> KafkaAclDTO.OperationEnum.DESCRIBE;
case CLUSTER_ACTION -> KafkaAclDTO.OperationEnum.CLUSTER_ACTION;
case DESCRIBE_CONFIGS -> KafkaAclDTO.OperationEnum.DESCRIBE_CONFIGS;
case ALTER_CONFIGS -> KafkaAclDTO.OperationEnum.ALTER_CONFIGS;
case IDEMPOTENT_WRITE -> KafkaAclDTO.OperationEnum.IDEMPOTENT_WRITE;
case CREATE_TOKENS -> KafkaAclDTO.OperationEnum.CREATE_TOKENS;
case DESCRIBE_TOKENS -> KafkaAclDTO.OperationEnum.DESCRIBE_TOKENS;
case ANY -> throw new IllegalArgumentException("ANY operation can be only part of filter");
case UNKNOWN -> KafkaAclDTO.OperationEnum.UNKNOWN;
};
}
static KafkaAclResourceTypeDTO mapAclResourceType(ResourceType resourceType) {
return switch (resourceType) {
case CLUSTER -> KafkaAclResourceTypeDTO.CLUSTER;
case TOPIC -> KafkaAclResourceTypeDTO.TOPIC;
case GROUP -> KafkaAclResourceTypeDTO.GROUP;
case DELEGATION_TOKEN -> KafkaAclResourceTypeDTO.DELEGATION_TOKEN;
case TRANSACTIONAL_ID -> KafkaAclResourceTypeDTO.TRANSACTIONAL_ID;
case USER -> KafkaAclResourceTypeDTO.USER;
case ANY -> throw new IllegalArgumentException("ANY type can be only part of filter");
case UNKNOWN -> KafkaAclResourceTypeDTO.UNKNOWN;
};
}
static ResourceType mapAclResourceTypeDto(KafkaAclResourceTypeDTO dto) {
return ResourceType.valueOf(dto.name());
}
static PatternType mapPatternTypeDto(KafkaAclNamePatternTypeDTO dto) {
return PatternType.valueOf(dto.name());
}
static AclBinding toAclBinding(KafkaAclDTO dto) {
return new AclBinding(
new ResourcePattern(
mapAclResourceTypeDto(dto.getResourceType()),
dto.getResourceName(),
mapPatternTypeDto(dto.getNamePatternType())
),
new AccessControlEntry(
dto.getPrincipal(),
dto.getHost(),
AclOperation.valueOf(dto.getOperation().name()),
AclPermissionType.valueOf(dto.getPermission().name())
)
);
}
static KafkaAclDTO toKafkaAclDto(AclBinding binding) {
var pattern = binding.pattern();
var filter = binding.toFilter().entryFilter();
return new KafkaAclDTO()
.resourceType(mapAclResourceType(pattern.resourceType()))
.resourceName(pattern.name())
.namePatternType(KafkaAclNamePatternTypeDTO.fromValue(pattern.patternType().name()))
.principal(filter.principal())
.host(filter.host())
.operation(mapAclOperation(filter.operation()))
.permission(KafkaAclDTO.PermissionEnum.fromValue(filter.permissionType().name()));
}
}

View file

@ -11,8 +11,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@ -82,15 +80,8 @@ public class ConsumerGroupMapper {
InternalConsumerGroup c, T consumerGroup) {
consumerGroup.setGroupId(c.getGroupId());
consumerGroup.setMembers(c.getMembers().size());
int numTopics = Stream.concat(
c.getOffsets().keySet().stream().map(TopicPartition::topic),
c.getMembers().stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).collect(Collectors.toSet()).size();
consumerGroup.setMessagesBehind(c.getMessagesBehind());
consumerGroup.setTopics(numTopics);
consumerGroup.setTopics(c.getTopicNum());
consumerGroup.setSimple(c.isSimple());
Optional.ofNullable(c.getState())

View file

@ -4,5 +4,7 @@ public enum ClusterFeature {
KAFKA_CONNECT,
KSQL_DB,
SCHEMA_REGISTRY,
TOPIC_DELETION
TOPIC_DELETION,
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import javax.annotation.Nullable;
import lombok.Data;
import org.apache.kafka.common.Node;
@ -10,15 +11,27 @@ public class InternalBroker {
private final Integer id;
private final String host;
private final Integer port;
private final BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec;
private final @Nullable BigDecimal bytesInPerSec;
private final @Nullable BigDecimal bytesOutPerSec;
private final @Nullable Integer partitionsLeader;
private final @Nullable Integer partitions;
private final @Nullable Integer inSyncPartitions;
private final @Nullable BigDecimal leadersSkew;
private final @Nullable BigDecimal partitionsSkew;
public InternalBroker(Node node, Statistics statistics) {
public InternalBroker(Node node,
PartitionDistributionStats partitionDistribution,
Statistics statistics) {
this.id = node.id();
this.host = node.host();
this.port = node.port();
this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
this.partitions = partitionDistribution.getPartitionsCount().get(node);
this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
this.leadersSkew = partitionDistribution.leadersSkew(node);
this.partitionsSkew = partitionDistribution.partitionsSkew(node);
}
}

View file

@ -5,6 +5,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@ -21,6 +22,7 @@ public class InternalConsumerGroup {
private final Map<TopicPartition, Long> offsets;
private final Map<TopicPartition, Long> endOffsets;
private final Long messagesBehind;
private final Integer topicNum;
private final String partitionAssignor;
private final ConsumerGroupState state;
private final Node coordinator;
@ -44,22 +46,12 @@ public class InternalConsumerGroup {
builder.simple(description.isSimpleConsumerGroup());
builder.state(description.state());
builder.partitionAssignor(description.partitionAssignor());
builder.members(
description.members().stream()
.map(m ->
InternalConsumerGroup.InternalMember.builder()
.assignment(m.assignment().topicPartitions())
.clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList())
);
Collection<InternalMember> internalMembers = initInternalMembers(description);
builder.members(internalMembers);
builder.offsets(groupOffsets);
builder.endOffsets(topicEndOffsets);
builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets));
builder.topicNum(calculateTopicNum(groupOffsets, internalMembers));
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
return builder.build();
}
@ -80,4 +72,31 @@ public class InternalConsumerGroup {
return messagesBehind;
}
private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
long topicNum = Stream.concat(
offsets.keySet().stream().map(TopicPartition::topic),
members.stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).distinct().count();
return Integer.valueOf((int) topicNum);
}
private static Collection<InternalMember> initInternalMembers(ConsumerGroupDescription description) {
return description.members().stream()
.map(m ->
InternalConsumerGroup.InternalMember.builder()
.assignment(m.assignment().topicPartitions())
.clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList());
}
}

View file

@ -0,0 +1,93 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Slf4j
public class PartitionDistributionStats {
// avg skew will show unuseful results on low number of partitions
private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
private static final MathContext ROUNDING_MATH_CTX = new MathContext(3);
private final Map<Node, Integer> partitionLeaders;
private final Map<Node, Integer> partitionsCount;
private final Map<Node, Integer> inSyncPartitions;
private final double avgLeadersCntPerBroker;
private final double avgPartitionsPerBroker;
private final boolean skewCanBeCalculated;
public static PartitionDistributionStats create(Statistics stats) {
return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
}
static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
var partitionLeaders = new HashMap<Node, Integer>();
var partitionsReplicated = new HashMap<Node, Integer>();
var isr = new HashMap<Node, Integer>();
int partitionsCnt = 0;
for (TopicDescription td : stats.getTopicDescriptions().values()) {
for (TopicPartitionInfo tp : td.partitions()) {
partitionsCnt++;
tp.replicas().forEach(r -> incr(partitionsReplicated, r));
tp.isr().forEach(r -> incr(isr, r));
if (tp.leader() != null) {
incr(partitionLeaders, tp.leader());
}
}
}
int nodesWithPartitions = partitionsReplicated.size();
int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
int nodesWithLeaders = partitionLeaders.size();
int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
return new PartitionDistributionStats(
partitionLeaders,
partitionsReplicated,
isr,
avgLeadersCntPerBroker,
avgPartitionsPerBroker,
partitionsCnt >= minPartitionsForSkewCalculation
);
}
private static void incr(Map<Node, Integer> map, Node n) {
map.compute(n, (k, c) -> c == null ? 1 : ++c);
}
@Nullable
public BigDecimal partitionsSkew(Node node) {
return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
}
@Nullable
public BigDecimal leadersSkew(Node node) {
return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
}
// Returns difference (in percents) from average value, null if it can't be calculated
@Nullable
private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
if (avgValue == 0 || !skewCanBeCalculated) {
return null;
}
value = value == null ? 0 : value;
return new BigDecimal((value - avgValue) / avgValue * 100.0).round(ROUNDING_MATH_CTX);
}
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@ -37,6 +38,8 @@ public class AccessContext {
Collection<KsqlAction> ksqlActions;
Collection<AclAction> aclActions;
public static AccessContextBuilder builder() {
return new AccessContextBuilder();
}
@ -55,6 +58,7 @@ public class AccessContext {
private String schema;
private Collection<SchemaAction> schemaActions = Collections.emptySet();
private Collection<KsqlAction> ksqlActions = Collections.emptySet();
private Collection<AclAction> aclActions = Collections.emptySet();
private AccessContextBuilder() {
}
@ -131,6 +135,12 @@ public class AccessContext {
return this;
}
public AccessContextBuilder aclActions(AclAction... actions) {
Assert.isTrue(actions.length > 0, "actions not present");
this.aclActions = List.of(actions);
return this;
}
public AccessContext build() {
return new AccessContext(
applicationConfigActions,
@ -140,7 +150,7 @@ public class AccessContext {
connect, connectActions,
connector,
schema, schemaActions,
ksqlActions);
ksqlActions, aclActions);
}
}
}

View file

@ -4,6 +4,7 @@ import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
@ -76,6 +77,7 @@ public class Permission {
case SCHEMA -> Arrays.stream(SchemaAction.values()).map(Enum::toString).toList();
case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList();
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
};
}

View file

@ -11,7 +11,8 @@ public enum Resource {
CONSUMER,
SCHEMA,
CONNECT,
KSQL;
KSQL,
ACL;
@Nullable
public static Resource fromString(String name) {

View file

@ -0,0 +1,15 @@
package com.provectus.kafka.ui.model.rbac.permission;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;
public enum AclAction implements PermissibleAction {
VIEW,
EDIT;
@Nullable
public static AclAction fromString(String name) {
return EnumUtils.getEnum(AclAction.class, name);
}
}

View file

@ -123,11 +123,11 @@ public class ConsumerRecordDeserializer {
}
private static Long getKeySize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
return consumerRecord.key() != null ? (long) consumerRecord.key().get().length : null;
return consumerRecord.key() != null ? (long) consumerRecord.serializedKeySize() : null;
}
private static Long getValueSize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
return consumerRecord.value() != null ? (long) consumerRecord.value().get().length : null;
return consumerRecord.value() != null ? (long) consumerRecord.serializedValueSize() : null;
}
private static int headerSize(Header header) {

View file

@ -122,8 +122,6 @@ public class SerdesInitializer {
registeredSerdes,
Optional.ofNullable(clusterProperties.getDefaultKeySerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
.orElse(null),
Optional.ofNullable(clusterProperties.getDefaultValueSerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))

View file

@ -1,33 +1,36 @@
package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
@Slf4j
public class AdminClientServiceImpl implements AdminClientService, Closeable {
private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
@Setter // used in tests
@Value("${kafka.admin-client-timeout:30000}")
private int clientTimeout;
private final int clientTimeout;
public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}
@Override
public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
@ -42,7 +45,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent(
AdminClientConfig.CLIENT_ID_CONFIG,
"kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()

View file

@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionDistributionStats;
import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Collections;
import java.util.HashMap;
@ -64,11 +65,13 @@ public class BrokerService {
}
public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
var stats = statisticsCache.get(cluster);
var partitionsDistribution = PartitionDistributionStats.create(stats);
return adminClientService
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
.map(node -> new InternalBroker(node, statisticsCache.get(cluster)))
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
}

View file

@ -101,6 +101,9 @@ public class ConsumerGroupService {
public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) {
}
private record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) {
}
public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
KafkaCluster cluster,
int pageNum,
@ -159,22 +162,19 @@ public class ConsumerGroupService {
sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
}
case MESSAGES_BEHIND -> {
record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) { }
Comparator<GroupWithDescr> comparator = Comparator.comparingLong(gwd ->
gwd.icg.getMessagesBehind() == null ? 0L : gwd.icg.getMessagesBehind());
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
}
case TOPIC_NUM -> {
Comparator<GroupWithDescr> comparator = Comparator.comparingInt(gwd -> gwd.icg.getTopicNum());
yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
yield ac.describeConsumerGroups(groupNames)
.flatMap(descriptionsMap -> {
List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
return getConsumerGroups(ac, descriptions)
.map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
.map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
.map(GroupWithDescr::cgd).toList());
}
);
}
};
}
@ -209,6 +209,27 @@ public class ConsumerGroupService {
.map(cgs -> new ArrayList<>(cgs.values()));
}
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
Comparator<GroupWithDescr> comparator,
int pageNum,
int perPage,
SortOrderDTO sortOrderDto) {
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
return ac.describeConsumerGroups(groupNames)
.flatMap(descriptionsMap -> {
List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
return getConsumerGroups(ac, descriptions)
.map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
.map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
.map(GroupWithDescr::cgd).toList());
}
);
}
public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster,
String consumerGroupId) {
return adminClientService.get(cluster)

View file

@ -2,16 +2,19 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -26,7 +29,7 @@ public class FeatureService {
private final AdminClientService adminClientService;
public Mono<List<ClusterFeature>> getAvailableFeatures(KafkaCluster cluster,
ReactiveAdminClient.ClusterDescription clusterDescription) {
ClusterDescription clusterDescription) {
List<Mono<ClusterFeature>> features = new ArrayList<>();
if (Optional.ofNullable(cluster.getConnectsClients())
@ -44,6 +47,8 @@ public class FeatureService {
}
features.add(topicDeletionEnabled(cluster, clusterDescription.getController()));
features.add(aclView(cluster));
features.add(aclEdit(clusterDescription));
return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
@ -65,4 +70,20 @@ public class FeatureService {
? Mono.just(ClusterFeature.TOPIC_DELETION)
: Mono.empty());
}
private Mono<ClusterFeature> aclEdit(ClusterDescription clusterDescription) {
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
boolean canEdit = authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER);
return canEdit
? Mono.just(ClusterFeature.KAFKA_ACL_EDIT)
: Mono.empty();
}
private Mono<ClusterFeature> aclView(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(
ac -> ac.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED)
? Mono.just(ClusterFeature.KAFKA_ACL_VIEW)
: Mono.empty()
);
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
@ -22,9 +23,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
@ -34,12 +33,18 @@ import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaClusterFactory {
@Value("${webclient.max-in-memory-buffer-size:20MB}")
private DataSize maxBuffSize;
private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
private final DataSize webClientMaxBuffSize;
public KafkaClusterFactory(WebclientProperties webclientProperties) {
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
.map(DataSize::parse)
.orElse(DEFAULT_WEBCLIENT_BUFFER);
}
public KafkaCluster create(ClustersProperties properties,
ClustersProperties.Cluster clusterProperties) {
@ -140,7 +145,7 @@ public class KafkaClusterFactory {
url -> new RetryingKafkaConnectClient(
connectCluster.toBuilder().address(url).build(),
cluster.getSsl(),
maxBuffSize
webClientMaxBuffSize
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No alive connect instances available",
@ -158,7 +163,7 @@ public class KafkaClusterFactory {
WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureBufferSize(maxBuffSize)
.configureBufferSize(webClientMaxBuffSize)
.build();
return ReactiveFailover.create(
parseUrlList(clusterProperties.getSchemaRegistry()),
@ -181,7 +186,7 @@ public class KafkaClusterFactory {
clusterProperties.getKsqldbServerAuth(),
clusterProperties.getSsl(),
clusterProperties.getKsqldbServerSsl(),
maxBuffSize
webClientMaxBuffSize
),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No live ksqldb instances available",

View file

@ -109,6 +109,7 @@ public class KafkaConnectService {
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
return Stream.of(
fullConnectorInfo.getName(),
fullConnectorInfo.getConnect(),
fullConnectorInfo.getStatus().getState().getValue(),
fullConnectorInfo.getType().getValue());
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters;
@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec;
@ -44,16 +45,35 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@Service
@RequiredArgsConstructor
@Slf4j
public class MessagesService {
private static final int DEFAULT_MAX_PAGE_SIZE = 500;
private static final int DEFAULT_PAGE_SIZE = 100;
// limiting UI messages rate to 20/sec in tailing mode
public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private final AdminClientService adminClientService;
private final DeserializationService deserializationService;
private final ConsumerGroupService consumerGroupService;
private final int maxPageSize;
private final int defaultPageSize;
public MessagesService(AdminClientService adminClientService,
DeserializationService deserializationService,
ConsumerGroupService consumerGroupService,
ClustersProperties properties) {
this.adminClientService = adminClientService;
this.deserializationService = deserializationService;
this.consumerGroupService = consumerGroupService;
var pollingProps = Optional.ofNullable(properties.getPolling())
.orElseGet(ClustersProperties.PollingProperties::new);
this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
.orElse(DEFAULT_MAX_PAGE_SIZE);
this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
.orElse(DEFAULT_PAGE_SIZE);
}
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster)
@ -139,7 +159,7 @@ public class MessagesService {
ConsumerPosition consumerPosition,
@Nullable String query,
MessageFilterTypeDTO filterQueryType,
int limit,
@Nullable Integer pageSize,
SeekDirectionDTO seekDirection,
@Nullable String keySerde,
@Nullable String valueSerde) {
@ -147,7 +167,13 @@ public class MessagesService {
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, limit, seekDirection, keySerde, valueSerde));
filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
}
private int fixPageSize(@Nullable Integer pageSize) {
return Optional.ofNullable(pageSize)
.filter(ps -> ps > 0 && ps <= maxPageSize)
.orElse(defaultPageSize);
}
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,

View file

@ -5,6 +5,7 @@ import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
@ -15,7 +16,6 @@ import com.provectus.kafka.ui.util.KafkaVersion;
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -61,16 +61,22 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -82,26 +88,29 @@ import reactor.util.function.Tuples;
@RequiredArgsConstructor
public class ReactiveAdminClient implements Closeable {
private enum SupportedFeature {
public enum SupportedFeature {
INCREMENTAL_ALTER_CONFIGS(2.3f),
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f);
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
private final float sinceVersion;
private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
SupportedFeature(float sinceVersion) {
this.sinceVersion = sinceVersion;
SupportedFeature(BiFunction<AdminClient, Float, Mono<Boolean>> predicate) {
this.predicate = predicate;
}
static Set<SupportedFeature> forVersion(float kafkaVersion) {
return Arrays.stream(SupportedFeature.values())
.filter(f -> kafkaVersion >= f.sinceVersion)
SupportedFeature(float fromVersion) {
this.predicate = (admin, ver) -> Mono.just(ver != null && ver >= fromVersion);
}
static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, @Nullable Float kafkaVersion) {
return Flux.fromArray(SupportedFeature.values())
.flatMap(f -> f.predicate.apply(ac, kafkaVersion).map(enabled -> Tuples.of(f, enabled)))
.filter(Tuple2::getT2)
.map(Tuple2::getT1)
.collect(Collectors.toSet());
}
static Set<SupportedFeature> defaultFeatures() {
return Set.of();
}
}
@Value
@ -110,25 +119,31 @@ public class ReactiveAdminClient implements Closeable {
Node controller;
String clusterId;
Collection<Node> nodes;
@Nullable // null, if ACL is disabled
Set<AclOperation> authorizedOperations;
}
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
return getClusterVersion(adminClient)
.map(ver ->
new ReactiveAdminClient(
adminClient,
ver,
getSupportedUpdateFeaturesForVersion(ver)));
.flatMap(ver ->
getSupportedUpdateFeaturesForVersion(adminClient, ver)
.map(features ->
new ReactiveAdminClient(adminClient, ver, features)));
}
private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
try {
float version = KafkaVersion.parse(versionStr);
return SupportedFeature.forVersion(version);
} catch (NumberFormatException e) {
return SupportedFeature.defaultFeatures();
}
private static Mono<Set<SupportedFeature>> getSupportedUpdateFeaturesForVersion(AdminClient ac, String versionStr) {
@Nullable Float kafkaVersion = KafkaVersion.parse(versionStr).orElse(null);
return SupportedFeature.forVersion(ac, kafkaVersion);
}
private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullable Float kafkaVersion) {
return toMono(ac.describeAcls(AclBindingFilter.ANY).values())
.thenReturn(true)
.doOnError(th -> !(th instanceof SecurityDisabledException)
&& !(th instanceof InvalidRequestException)
&& !(th instanceof UnsupportedVersionException),
th -> log.warn("Error checking if security enabled", th))
.onErrorReturn(false);
}
// NOTE: if KafkaFuture returns null, that Mono will be empty(!), since Reactor does not support nullable results
@ -162,6 +177,10 @@ public class ReactiveAdminClient implements Closeable {
private final String version;
private final Set<SupportedFeature> features;
public Set<SupportedFeature> getClusterFeatures() {
return features;
}
public Mono<Set<String>> listTopics(boolean listInternal) {
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
}
@ -576,6 +595,22 @@ public class ReactiveAdminClient implements Closeable {
);
}
public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
}
public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.createAcls(aclBindings).all());
}
public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
Preconditions.checkArgument(features.contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
return toMono(client.deleteAcls(filters).all()).then();
}
public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);

View file

@ -0,0 +1,81 @@
package com.provectus.kafka.ui.service.acl;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
public class AclCsv {
private static final String LINE_SEPARATOR = System.lineSeparator();
private static final String VALUES_SEPARATOR = ",";
private static final String HEADER = "Principal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host";
public static String transformToCsvString(Collection<AclBinding> acls) {
return Stream.concat(Stream.of(HEADER), acls.stream().map(AclCsv::createAclString))
.collect(Collectors.joining(System.lineSeparator()));
}
public static String createAclString(AclBinding binding) {
var pattern = binding.pattern();
var filter = binding.toFilter().entryFilter();
return String.format(
"%s,%s,%s,%s,%s,%s,%s",
filter.principal(),
pattern.resourceType(),
pattern.patternType(),
pattern.name(),
filter.operation(),
filter.permissionType(),
filter.host()
);
}
private static AclBinding parseCsvLine(String csv, int line) {
String[] values = csv.split(VALUES_SEPARATOR);
if (values.length != 7) {
throw new ValidationException("Input csv is not valid - there should be 7 columns in line " + line);
}
for (int i = 0; i < values.length; i++) {
if ((values[i] = values[i].trim()).isBlank()) {
throw new ValidationException("Input csv is not valid - blank value in colum " + i + ", line " + line);
}
}
try {
return new AclBinding(
new ResourcePattern(
ResourceType.valueOf(values[1]), values[3], PatternType.valueOf(values[2])),
new AccessControlEntry(
values[0], values[6], AclOperation.valueOf(values[4]), AclPermissionType.valueOf(values[5]))
);
} catch (IllegalArgumentException enumParseError) {
throw new ValidationException("Error parsing enum value in line " + line);
}
}
public static Collection<AclBinding> parseCsv(String csvString) {
String[] lines = csvString.split(LINE_SEPARATOR);
if (lines.length == 0) {
throw new ValidationException("Error parsing ACL csv file: no lines in file");
}
boolean firstLineIsHeader = HEADER.equalsIgnoreCase(lines[0].trim().replace(" ", ""));
Set<AclBinding> result = new HashSet<>();
for (int i = firstLineIsHeader ? 1 : 0; i < lines.length; i++) {
String line = lines[i];
if (!line.isBlank()) {
AclBinding aclBinding = parseCsvLine(line, i);
result.add(aclBinding);
}
}
return result;
}
}

View file

@ -0,0 +1,93 @@
package com.provectus.kafka.ui.service.acl;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import java.util.List;
import java.util.Set;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
@Service
@RequiredArgsConstructor
public class AclsService {
private final AdminClientService adminClientService;
public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
var aclString = AclCsv.createAclString(aclBinding);
log.info("CREATING ACL: [{}]", aclString);
return adminClientService.get(cluster)
.flatMap(ac -> ac.createAcls(List.of(aclBinding)))
.doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
}
public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
var aclString = AclCsv.createAclString(aclBinding);
log.info("DELETING ACL: [{}]", aclString);
return adminClientService.get(cluster)
.flatMap(ac -> ac.deleteAcls(List.of(aclBinding)))
.doOnSuccess(v -> log.info("ACL DELETED: [{}]", aclString));
}
public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter filter) {
return adminClientService.get(cluster)
.flatMap(c -> c.listAcls(filter))
.flatMapIterable(acls -> acls);
}
public Mono<String> getAclAsCsvString(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(c -> c.listAcls(ResourcePatternFilter.ANY))
.map(AclCsv::transformToCsvString);
}
public Mono<Void> syncAclWithAclCsv(KafkaCluster cluster, String csv) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.listAcls(ResourcePatternFilter.ANY).flatMap(existingAclList -> {
var existingSet = Set.copyOf(existingAclList);
var newAcls = Set.copyOf(AclCsv.parseCsv(csv));
var toDelete = Sets.difference(existingSet, newAcls);
var toAdd = Sets.difference(newAcls, existingSet);
logAclSyncPlan(cluster, toAdd, toDelete);
if (toAdd.isEmpty() && toDelete.isEmpty()) {
return Mono.empty();
}
log.info("Starting new ACLs creation");
return ac.createAcls(toAdd)
.doOnSuccess(v -> {
log.info("{} new ACLs created", toAdd.size());
log.info("Starting ACLs deletion");
})
.then(ac.deleteAcls(toDelete)
.doOnSuccess(v -> log.info("{} ACLs deleted", toDelete.size())));
}));
}
private void logAclSyncPlan(KafkaCluster cluster, Set<AclBinding> toBeAdded, Set<AclBinding> toBeDeleted) {
log.info("'{}' cluster ACL sync plan: ", cluster.getName());
if (toBeAdded.isEmpty() && toBeDeleted.isEmpty()) {
log.info("Nothing to do, ACL is already in sync");
return;
}
if (!toBeAdded.isEmpty()) {
log.info("ACLs to be added ({}): ", toBeAdded.size());
for (AclBinding aclBinding : toBeAdded) {
log.info(" " + AclCsv.createAclString(aclBinding));
}
}
if (!toBeDeleted.isEmpty()) {
log.info("ACLs to be deleted ({}): ", toBeDeleted.size());
for (AclBinding aclBinding : toBeDeleted) {
log.info(" " + AclCsv.createAclString(aclBinding));
}
}
}
}

View file

@ -43,8 +43,7 @@ class TopicAnalysisStats {
Long max;
final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build();
void apply(byte[] bytes) {
int len = bytes.length;
void apply(int len) {
sum += len;
min = minNullable(min, len);
max = maxNullable(max, len);
@ -98,7 +97,7 @@ class TopicAnalysisStats {
if (rec.key() != null) {
byte[] keyBytes = rec.key().get();
keysSize.apply(keyBytes);
keysSize.apply(rec.serializedKeySize());
uniqKeys.update(keyBytes);
} else {
nullKeys++;
@ -106,7 +105,7 @@ class TopicAnalysisStats {
if (rec.value() != null) {
byte[] valueBytes = rec.value().get();
valuesSize.apply(valueBytes);
valuesSize.apply(rec.serializedValueSize());
uniqValues.update(valueBytes);
} else {
nullValues++;

View file

@ -44,7 +44,7 @@ public class DataMasking {
public static DataMasking create(@Nullable List<ClustersProperties.Masking> config) {
return new DataMasking(
Optional.ofNullable(config).orElse(List.of()).stream().map(property -> {
Preconditions.checkNotNull(property.getType(), "masking type not specifed");
Preconditions.checkNotNull(property.getType(), "masking type not specified");
Preconditions.checkArgument(
StringUtils.isNotEmpty(property.getTopicKeysPattern())
|| StringUtils.isNotEmpty(property.getTopicValuesPattern()),

View file

@ -0,0 +1,28 @@
package com.provectus.kafka.ui.service.masking.policies;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.regex.Pattern;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
interface FieldsSelector {
static FieldsSelector create(ClustersProperties.Masking property) {
if (StringUtils.hasText(property.getFieldsNamePattern()) && !CollectionUtils.isEmpty(property.getFields())) {
throw new ValidationException("You can't provide both fieldNames & fieldsNamePattern for masking");
}
if (StringUtils.hasText(property.getFieldsNamePattern())) {
Pattern pattern = Pattern.compile(property.getFieldsNamePattern());
return f -> pattern.matcher(f).matches();
}
if (!CollectionUtils.isEmpty(property.getFields())) {
return f -> property.getFields().contains(f);
}
//no pattern, no field names - mean all fields should be masked
return fieldName -> true;
}
boolean shouldBeMasked(String fieldName);
}

View file

@ -15,8 +15,8 @@ class Mask extends MaskingPolicy {
private final UnaryOperator<String> masker;
Mask(List<String> fieldNames, List<String> maskingChars) {
super(fieldNames);
Mask(FieldsSelector fieldsSelector, List<String> maskingChars) {
super(fieldsSelector);
this.masker = createMasker(maskingChars);
}
@ -38,22 +38,13 @@ class Mask extends MaskingPolicy {
for (int i = 0; i < input.length(); i++) {
int cp = input.codePointAt(i);
switch (Character.getType(cp)) {
case Character.SPACE_SEPARATOR:
case Character.LINE_SEPARATOR:
case Character.PARAGRAPH_SEPARATOR:
sb.appendCodePoint(cp); // keeping separators as-is
break;
case Character.UPPERCASE_LETTER:
sb.append(maskingChars.get(0));
break;
case Character.LOWERCASE_LETTER:
sb.append(maskingChars.get(1));
break;
case Character.DECIMAL_DIGIT_NUMBER:
sb.append(maskingChars.get(2));
break;
default:
sb.append(maskingChars.get(3));
case Character.SPACE_SEPARATOR,
Character.LINE_SEPARATOR,
Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keeping separators as-is
case Character.UPPERCASE_LETTER -> sb.append(maskingChars.get(0));
case Character.LOWERCASE_LETTER -> sb.append(maskingChars.get(1));
case Character.DECIMAL_DIGIT_NUMBER -> sb.append(maskingChars.get(2));
default -> sb.append(maskingChars.get(3));
}
}
return sb.toString();

View file

@ -2,46 +2,36 @@ package com.provectus.kafka.ui.service.masking.policies;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.provectus.kafka.ui.config.ClustersProperties;
import java.util.List;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public abstract class MaskingPolicy {
public static MaskingPolicy create(ClustersProperties.Masking property) {
List<String> fields = property.getFields() == null
? List.of() // empty list means that policy will be applied to all fields
: property.getFields();
switch (property.getType()) {
case REMOVE:
return new Remove(fields);
case REPLACE:
return new Replace(
fields,
property.getReplacement() == null
? Replace.DEFAULT_REPLACEMENT
: property.getReplacement()
);
case MASK:
return new Mask(
fields,
property.getPattern() == null
? Mask.DEFAULT_PATTERN
: property.getPattern()
);
default:
throw new IllegalStateException("Unknown policy type: " + property.getType());
}
FieldsSelector fieldsSelector = FieldsSelector.create(property);
return switch (property.getType()) {
case REMOVE -> new Remove(fieldsSelector);
case REPLACE -> new Replace(
fieldsSelector,
property.getReplacement() == null
? Replace.DEFAULT_REPLACEMENT
: property.getReplacement()
);
case MASK -> new Mask(
fieldsSelector,
property.getMaskingCharsReplacement() == null
? Mask.DEFAULT_PATTERN
: property.getMaskingCharsReplacement()
);
};
}
//----------------------------------------------------------------
// empty list means policy will be applied to all fields
private final List<String> fieldNames;
private final FieldsSelector fieldsSelector;
protected boolean fieldShouldBeMasked(String fieldName) {
return fieldNames.isEmpty() || fieldNames.contains(fieldName);
return fieldsSelector.shouldBeMasked(fieldName);
}
public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node);

View file

@ -4,12 +4,12 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.List;
class Remove extends MaskingPolicy {
Remove(List<String> fieldNames) {
super(fieldNames);
Remove(FieldsSelector fieldsSelector) {
super(fieldsSelector);
}
@Override

View file

@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions;
import java.util.List;
class Replace extends MaskingPolicy {
@ -14,8 +13,8 @@ class Replace extends MaskingPolicy {
private final String replacement;
Replace(List<String> fieldNames, String replacementString) {
super(fieldNames);
Replace(FieldsSelector fieldsSelector, String replacementString) {
super(fieldsSelector);
this.replacement = Preconditions.checkNotNull(replacementString);
}

View file

@ -61,7 +61,9 @@ class JmxSslSocketFactory extends javax.net.ssl.SSLSocketFactory {
} catch (Exception e) {
log.error("----------------------------------");
log.error("SSL can't be enabled for JMX retrieval. "
+ "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg.", e);
+ "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
e.getMessage());
log.trace("SSL can't be enabled for JMX retrieval", e);
log.error("----------------------------------");
}
SSL_JMX_SUPPORTED = sslJmxSupported;

View file

@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Permission;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.Subject;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
@ -19,11 +20,11 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.rbac.extractor.CognitoAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.LdapAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.ProviderAuthorityExtractor;
import jakarta.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@ -34,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.core.env.Environment;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext;
@ -50,10 +52,11 @@ public class AccessControlService {
@Nullable
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
private final RoleBasedAccessControlProperties properties;
private final Environment environment;
private boolean rbacEnabled = false;
private Set<ProviderAuthorityExtractor> extractors = Collections.emptySet();
private final RoleBasedAccessControlProperties properties;
private Set<ProviderAuthorityExtractor> oauthExtractors = Collections.emptySet();
@PostConstruct
public void init() {
@ -63,21 +66,26 @@ public class AccessControlService {
}
rbacEnabled = true;
this.extractors = properties.getRoles()
this.oauthExtractors = properties.getRoles()
.stream()
.map(role -> role.getSubjects()
.stream()
.map(provider -> switch (provider.getProvider()) {
.map(Subject::getProvider)
.distinct()
.map(provider -> switch (provider) {
case OAUTH_COGNITO -> new CognitoAuthorityExtractor();
case OAUTH_GOOGLE -> new GoogleAuthorityExtractor();
case OAUTH_GITHUB -> new GithubAuthorityExtractor();
case LDAP, LDAP_AD -> new LdapAuthorityExtractor();
}).collect(Collectors.toSet()))
default -> null;
})
.filter(Objects::nonNull)
.collect(Collectors.toSet()))
.flatMap(Set::stream)
.collect(Collectors.toSet());
if ((clientRegistrationRepository == null || !clientRegistrationRepository.iterator().hasNext())
&& !properties.getRoles().isEmpty()) {
if (!properties.getRoles().isEmpty()
&& "oauth2".equalsIgnoreCase(environment.getProperty("auth.type"))
&& (clientRegistrationRepository == null || !clientRegistrationRepository.iterator().hasNext())) {
log.error("Roles are configured but no authentication methods are present. Authentication might fail.");
}
}
@ -354,8 +362,8 @@ public class AccessControlService {
return isAccessible(Resource.KSQL, null, user, context, requiredActions);
}
public Set<ProviderAuthorityExtractor> getExtractors() {
return extractors;
public Set<ProviderAuthorityExtractor> getOauthExtractors() {
return oauthExtractors;
}
public List<Role> getRoles() {

View file

@ -1,23 +0,0 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class LdapAuthorityExtractor implements ProviderAuthorityExtractor {
@Override
public boolean isApplicable(String provider) {
return false; // TODO #2752
}
@Override
public Mono<Set<String>> extract(AccessControlService acs, Object value, Map<String, Object> additionalParams) {
return Mono.just(Collections.emptySet()); // TODO #2752
}
}

View file

@ -0,0 +1,70 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.provectus.kafka.ui.config.auth.LdapProperties;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.ldap.core.DirContextOperations;
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
import org.springframework.util.Assert;
@Slf4j
public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
private final AccessControlService acs;
private final LdapProperties props;
private final Function<Map<String, List<String>>, GrantedAuthority> authorityMapper = (record) -> {
String role = record.get(getGroupRoleAttribute()).get(0);
return new SimpleGrantedAuthority(role);
};
public RbacLdapAuthoritiesExtractor(ApplicationContext context) {
super(context.getBean(BaseLdapPathContextSource.class), null);
this.acs = context.getBean(AccessControlService.class);
this.props = context.getBean(LdapProperties.class);
}
@Override
public Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
return acs.getRoles()
.stream()
.map(Role::getSubjects)
.flatMap(List::stream)
.filter(s -> s.getProvider().equals(Provider.LDAP))
.filter(s -> s.getType().equals("group"))
.flatMap(subject -> getRoles(subject.getValue(), user.getNameInNamespace(), username).stream())
.collect(Collectors.toSet());
}
private Set<GrantedAuthority> getRoles(String groupSearchBase, String userDn, String username) {
Assert.notNull(groupSearchBase, "groupSearchBase is empty");
log.trace(
"Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
username, userDn, props.getGroupRoleAttribute(), getGroupSearchFilter(), groupSearchBase);
var ldapTemplate = getLdapTemplate();
ldapTemplate.setIgnoreNameNotFoundException(true);
Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
new String[] {props.getGroupRoleAttribute()});
return userRoles.stream()
.map(authorityMapper)
.peek(a -> log.debug("Mapped role [{}] for user [{}]", a, username))
.collect(Collectors.toSet());
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.config.auth.OAuthProperties;
import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
import com.provectus.kafka.ui.exception.FileUploadException;
@ -89,6 +90,7 @@ public class DynamicConfigOperations {
}
public PropertiesStructure getCurrentProperties() {
checkIfDynamicConfigEnabled();
return PropertiesStructure.builder()
.kafka(getNullableBean(ClustersProperties.class))
.rbac(getNullableBean(RoleBasedAccessControlProperties.class))
@ -97,6 +99,7 @@ public class DynamicConfigOperations {
.type(ctx.getEnvironment().getProperty("auth.type"))
.oauth2(getNullableBean(OAuthProperties.class))
.build())
.webclient(getNullableBean(WebclientProperties.class))
.build();
}
@ -110,11 +113,7 @@ public class DynamicConfigOperations {
}
public void persist(PropertiesStructure properties) {
if (!dynamicConfigEnabled()) {
throw new ValidationException(
"Dynamic config change is not allowed. "
+ "Set dynamic.config.enabled property to 'true' to enabled it.");
}
checkIfDynamicConfigEnabled();
properties.initAndValidate();
String yaml = serializeToYaml(properties);
@ -122,8 +121,9 @@ public class DynamicConfigOperations {
}
public Mono<Path> uploadConfigRelatedFile(FilePart file) {
String targetDirStr = (String) ctx.getEnvironment().getSystemEnvironment()
.getOrDefault(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT);
checkIfDynamicConfigEnabled();
String targetDirStr = ctx.getEnvironment()
.getProperty(CONFIG_RELATED_UPLOADS_DIR_PROPERTY, CONFIG_RELATED_UPLOADS_DIR_DEFAULT);
Path targetDir = Path.of(targetDirStr);
if (!Files.exists(targetDir)) {
@ -147,6 +147,14 @@ public class DynamicConfigOperations {
.onErrorMap(th -> new FileUploadException(targetFilePath, th));
}
private void checkIfDynamicConfigEnabled() {
if (!dynamicConfigEnabled()) {
throw new ValidationException(
"Dynamic config change is not allowed. "
+ "Set dynamic.config.enabled property to 'true' to enabled it.");
}
}
@SneakyThrows
private void writeYamlToFile(String yaml, Path path) {
if (Files.isDirectory(path)) {
@ -204,6 +212,7 @@ public class DynamicConfigOperations {
private ClustersProperties kafka;
private RoleBasedAccessControlProperties rbac;
private Auth auth;
private WebclientProperties webclient;
@Data
@Builder
@ -222,6 +231,9 @@ public class DynamicConfigOperations {
Optional.ofNullable(auth)
.flatMap(a -> Optional.ofNullable(a.oauth2))
.ifPresent(OAuthProperties::validate);
Optional.ofNullable(webclient)
.ifPresent(WebclientProperties::validate);
}
}

View file

@ -1,24 +1,21 @@
package com.provectus.kafka.ui.util;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@Slf4j
public final class KafkaVersion {
private KafkaVersion() {
}
public static float parse(String version) throws NumberFormatException {
log.trace("Parsing cluster version [{}]", version);
public static Optional<Float> parse(String version) throws NumberFormatException {
try {
final String[] parts = version.split("\\.");
if (parts.length > 2) {
version = parts[0] + "." + parts[1];
}
return Float.parseFloat(version.split("-")[0]);
return Optional.of(Float.parseFloat(version.split("-")[0]));
} catch (Exception e) {
log.error("Conversion clusterVersion [{}] to float value failed", version, e);
throw e;
return Optional.empty();
}
}
}

View file

@ -4,9 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
// Specifies field that can contain any kind of value - primitive, complex and nulls
public class AnyFieldSchema implements FieldSchema {
class AnyFieldSchema implements FieldSchema {
public static AnyFieldSchema get() {
static AnyFieldSchema get() {
return new AnyFieldSchema();
}

View file

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class ArrayFieldSchema implements FieldSchema {
class ArrayFieldSchema implements FieldSchema {
private final FieldSchema itemsSchema;
public ArrayFieldSchema(FieldSchema itemsSchema) {
ArrayFieldSchema(FieldSchema itemsSchema) {
this.itemsSchema = itemsSchema;
}

View file

@ -7,10 +7,10 @@ import java.util.List;
import java.util.Map;
public class EnumJsonType extends JsonType {
class EnumJsonType extends JsonType {
private final List<String> values;
public EnumJsonType(List<String> values) {
EnumJsonType(List<String> values) {
super(Type.ENUM);
this.values = values;
}

View file

@ -3,6 +3,6 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public interface FieldSchema {
interface FieldSchema {
JsonNode toJsonNode(ObjectMapper mapper);
}

View file

@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
public abstract class JsonType {
abstract class JsonType {
protected final Type type;
@ -12,13 +12,13 @@ public abstract class JsonType {
this.type = type;
}
public Type getType() {
Type getType() {
return type;
}
public abstract Map<String, JsonNode> toJsonNode(ObjectMapper mapper);
abstract Map<String, JsonNode> toJsonNode(ObjectMapper mapper);
public enum Type {
enum Type {
NULL,
BOOLEAN,
OBJECT,

View file

@ -2,21 +2,27 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import javax.annotation.Nullable;
public class MapFieldSchema implements FieldSchema {
private final FieldSchema itemSchema;
class MapFieldSchema implements FieldSchema {
private final @Nullable FieldSchema itemSchema;
public MapFieldSchema(FieldSchema itemSchema) {
MapFieldSchema(@Nullable FieldSchema itemSchema) {
this.itemSchema = itemSchema;
}
MapFieldSchema() {
this(null);
}
@Override
public JsonNode toJsonNode(ObjectMapper mapper) {
final ObjectNode objectNode = mapper.createObjectNode();
objectNode.set("type", new TextNode(JsonType.Type.OBJECT.getName()));
objectNode.set("additionalProperties", itemSchema.toJsonNode(mapper));
objectNode.set("additionalProperties", itemSchema != null ? itemSchema.toJsonNode(mapper) : BooleanNode.TRUE);
return objectNode;
}
}

View file

@ -9,24 +9,24 @@ import java.util.stream.Collectors;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class ObjectFieldSchema implements FieldSchema {
class ObjectFieldSchema implements FieldSchema {
public static final ObjectFieldSchema EMPTY = new ObjectFieldSchema(Map.of(), List.of());
static final ObjectFieldSchema EMPTY = new ObjectFieldSchema(Map.of(), List.of());
private final Map<String, FieldSchema> properties;
private final List<String> required;
public ObjectFieldSchema(Map<String, FieldSchema> properties,
ObjectFieldSchema(Map<String, FieldSchema> properties,
List<String> required) {
this.properties = properties;
this.required = required;
}
public Map<String, FieldSchema> getProperties() {
Map<String, FieldSchema> getProperties() {
return properties;
}
public List<String> getRequired() {
List<String> getRequired() {
return required;
}

View file

@ -5,11 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.stream.Collectors;
public class OneOfFieldSchema implements FieldSchema {
class OneOfFieldSchema implements FieldSchema {
private final List<FieldSchema> schemaList;
public OneOfFieldSchema(
List<FieldSchema> schemaList) {
OneOfFieldSchema(List<FieldSchema> schemaList) {
this.schemaList = schemaList;
}

View file

@ -94,6 +94,9 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
if (wellKnownTypeSchema.isPresent()) {
return wellKnownTypeSchema.get();
}
if (field.isMapField()) {
return new MapFieldSchema();
}
final JsonType jsonType = convertType(field);
FieldSchema fieldSchema;
if (jsonType.getType().equals(JsonType.Type.OBJECT)) {
@ -149,67 +152,47 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
}
private JsonType convertType(Descriptors.FieldDescriptor field) {
switch (field.getType()) {
case INT32:
case FIXED32:
case SFIXED32:
case SINT32:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", IntNode.valueOf(Integer.MAX_VALUE),
"minimum", IntNode.valueOf(Integer.MIN_VALUE)
)
);
case UINT32:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
"minimum", IntNode.valueOf(0)
)
);
return switch (field.getType()) {
case INT32, FIXED32, SFIXED32, SINT32 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", IntNode.valueOf(Integer.MAX_VALUE),
"minimum", IntNode.valueOf(Integer.MIN_VALUE)
)
);
case UINT32 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
"minimum", IntNode.valueOf(0)
)
);
//TODO: actually all *64 types will be printed with quotes (as strings),
// see JsonFormat::printSingleFieldValue for impl. This can cause problems when you copy-paste from messages
// table to `Produce` area - need to think if it is critical or not.
case INT64:
case FIXED64:
case SFIXED64:
case SINT64:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(Long.MAX_VALUE),
"minimum", LongNode.valueOf(Long.MIN_VALUE)
)
);
case UINT64:
return new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
"minimum", LongNode.valueOf(0)
)
);
case MESSAGE:
case GROUP:
return new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM:
return new EnumJsonType(
field.getEnumType().getValues().stream()
.map(Descriptors.EnumValueDescriptor::getName)
.collect(Collectors.toList())
);
case BYTES:
case STRING:
return new SimpleJsonType(JsonType.Type.STRING);
case FLOAT:
case DOUBLE:
return new SimpleJsonType(JsonType.Type.NUMBER);
case BOOL:
return new SimpleJsonType(JsonType.Type.BOOLEAN);
default:
return new SimpleJsonType(JsonType.Type.STRING);
}
case INT64, FIXED64, SFIXED64, SINT64 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", LongNode.valueOf(Long.MAX_VALUE),
"minimum", LongNode.valueOf(Long.MIN_VALUE)
)
);
case UINT64 -> new SimpleJsonType(
JsonType.Type.INTEGER,
Map.of(
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
"minimum", LongNode.valueOf(0)
)
);
case MESSAGE, GROUP -> new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM -> new EnumJsonType(
field.getEnumType().getValues().stream()
.map(Descriptors.EnumValueDescriptor::getName)
.collect(Collectors.toList())
);
case BYTES, STRING -> new SimpleJsonType(JsonType.Type.STRING);
case FLOAT, DOUBLE -> new SimpleJsonType(JsonType.Type.NUMBER);
case BOOL -> new SimpleJsonType(JsonType.Type.BOOLEAN);
};
}
}

View file

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.TextNode;
public class RefFieldSchema implements FieldSchema {
class RefFieldSchema implements FieldSchema {
private final String ref;
public RefFieldSchema(String ref) {
RefFieldSchema(String ref) {
this.ref = ref;
}
@ -16,7 +16,7 @@ public class RefFieldSchema implements FieldSchema {
return mapper.createObjectNode().set("$ref", new TextNode(ref));
}
public String getRef() {
String getRef() {
return ref;
}
}

View file

@ -3,10 +3,10 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SimpleFieldSchema implements FieldSchema {
class SimpleFieldSchema implements FieldSchema {
private final JsonType type;
public SimpleFieldSchema(JsonType type) {
SimpleFieldSchema(JsonType type) {
this.type = type;
}

View file

@ -6,15 +6,15 @@ import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class SimpleJsonType extends JsonType {
class SimpleJsonType extends JsonType {
private final Map<String, JsonNode> additionalTypeProperties;
public SimpleJsonType(Type type) {
SimpleJsonType(Type type) {
this(type, Map.of());
}
public SimpleJsonType(Type type, Map<String, JsonNode> additionalTypeProperties) {
SimpleJsonType(Type type, Map<String, JsonNode> additionalTypeProperties) {
super(type);
this.additionalTypeProperties = additionalTypeProperties;
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
@ -9,6 +10,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.function.ThrowingConsumer;
import org.junit.jupiter.api.io.TempDir;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
@ -47,6 +49,9 @@ public abstract class AbstractIntegrationTest {
.dependsOn(kafka)
.dependsOn(schemaRegistry);
@TempDir
public static Path tmpDir;
static {
kafka.start();
schemaRegistry.start();
@ -76,6 +81,9 @@ public abstract class AbstractIntegrationTest {
System.setProperty("kafka.clusters.1.schemaRegistry", schemaRegistry.getUrl());
System.setProperty("kafka.clusters.1.kafkaConnect.0.name", "kafka-connect");
System.setProperty("kafka.clusters.1.kafkaConnect.0.address", kafkaConnect.getTarget());
System.setProperty("dynamic.config.enabled", "true");
System.setProperty("config.related.uploads.dir", tmpDir.toString());
}
}

View file

@ -0,0 +1,49 @@
package com.provectus.kafka.ui.controller;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
import java.io.IOException;
import java.nio.file.Path;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.http.HttpEntity;
import org.springframework.http.client.MultipartBodyBuilder;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.util.MultiValueMap;
class ApplicationConfigControllerTest extends AbstractIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void testUpload() throws IOException {
var fileToUpload = new ClassPathResource("/fileForUploadTest.txt", this.getClass());
UploadedFileInfoDTO result = webTestClient
.post()
.uri("/api/config/relatedfiles")
.bodyValue(generateBody(fileToUpload))
.exchange()
.expectStatus()
.isOk()
.expectBody(UploadedFileInfoDTO.class)
.returnResult()
.getResponseBody();
assertThat(result).isNotNull();
assertThat(result.getLocation()).isNotNull();
assertThat(Path.of(result.getLocation()))
.hasSameBinaryContentAs(fileToUpload.getFile().toPath());
}
private MultiValueMap<String, HttpEntity<?>> generateBody(ClassPathResource resource) {
MultipartBodyBuilder builder = new MultipartBodyBuilder();
builder.part("file", resource);
return builder.build();
}
}

View file

@ -0,0 +1,83 @@
package com.provectus.kafka.ui.model;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;
class PartitionDistributionStatsTest {
@Test
void skewCalculatedBasedOnPartitionsCounts() {
Node n1 = new Node(1, "n1", 9092);
Node n2 = new Node(2, "n2", 9092);
Node n3 = new Node(3, "n3", 9092);
Node n4 = new Node(4, "n4", 9092);
var stats = PartitionDistributionStats.create(
Statistics.builder()
.clusterDescription(
new ReactiveAdminClient.ClusterDescription(null, "test", Set.of(n1, n2, n3), null))
.topicDescriptions(
Map.of(
"t1", new TopicDescription(
"t1", false,
List.of(
new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3))
)
),
"t2", new TopicDescription(
"t2", false,
List.of(
new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1))
)
)
)
)
.build(), 4
);
assertThat(stats.getPartitionLeaders())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 2, n2, 1));
assertThat(stats.getPartitionsCount())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 4, n3, 1));
assertThat(stats.getInSyncPartitions())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 3, n3, 1));
// Node(partitions): n1(3), n2(4), n3(1), n4(0)
// average partitions cnt = (3+4+1) / 3 = 2.666 (counting only nodes with partitions!)
assertThat(stats.getAvgPartitionsPerBroker())
.isCloseTo(2.666, Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n1))
.isCloseTo(BigDecimal.valueOf(12.5), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n2))
.isCloseTo(BigDecimal.valueOf(50), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n3))
.isCloseTo(BigDecimal.valueOf(-62.5), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n4))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
// Node(leaders): n1(2), n2(1), n3(0), n4(0)
// average leaders cnt = (2+1) / 2 = 1.5 (counting only nodes with leaders!)
assertThat(stats.leadersSkew(n1))
.isCloseTo(BigDecimal.valueOf(33.33), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n2))
.isCloseTo(BigDecimal.valueOf(-33.33), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n3))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n4))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
}
}

View file

@ -0,0 +1,70 @@
package com.provectus.kafka.ui.service.acl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
class AclCsvTest {
private static final List<AclBinding> TEST_BINDINGS = List.of(
new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW)),
new AclBinding(
new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED),
new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY))
);
@ParameterizedTest
@ValueSource(strings = {
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost",
//without header
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ "\n"
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
+ "\n"
})
void parsesValidInputCsv(String csvString) {
Collection<AclBinding> parsed = AclCsv.parseCsv(csvString);
assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
}
@ParameterizedTest
@ValueSource(strings = {
// columns > 7
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*,1,2,3,4",
// columns < 7
"User:test1,TOPIC,LITERAL,*",
// enum values are illegal
"User:test1,ILLEGAL,LITERAL,*,READ,ALLOW,*",
"User:test1,TOPIC,LITERAL,*,READ,ILLEGAL,*"
})
void throwsExceptionForInvalidInputCsv(String csvString) {
assertThatThrownBy(() -> AclCsv.parseCsv(csvString))
.isInstanceOf(ValidationException.class);
}
@Test
void transformAndParseUseSameFormat() {
String csv = AclCsv.transformToCsvString(TEST_BINDINGS);
Collection<AclBinding> parsedBindings = AclCsv.parseCsv(csv);
assertThat(parsedBindings).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
}
}

View file

@ -0,0 +1,82 @@
package com.provectus.kafka.ui.service.acl;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Mono;
class AclsServiceTest {
private static final KafkaCluster CLUSTER = KafkaCluster.builder().build();
private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class);
private final AdminClientService adminClientService = mock(AdminClientService.class);
private final AclsService aclsService = new AclsService(adminClientService);
@BeforeEach
void initMocks() {
when(adminClientService.get(CLUSTER)).thenReturn(Mono.just(adminClientMock));
}
@Test
void testSyncAclWithAclCsv() {
var existingBinding1 = new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL),
new AccessControlEntry("User:test1", "*", AclOperation.READ, AclPermissionType.ALLOW));
var existingBinding2 = new AclBinding(
new ResourcePattern(ResourceType.GROUP, "group1", PatternType.PREFIXED),
new AccessControlEntry("User:test2", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY));
var newBindingToBeAdded = new AclBinding(
new ResourcePattern(ResourceType.GROUP, "groupNew", PatternType.PREFIXED),
new AccessControlEntry("User:test3", "localhost", AclOperation.DESCRIBE, AclPermissionType.DENY));
when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
.thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
.thenReturn(Mono.empty());
ArgumentCaptor<?> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.deleteAcls((Collection<AclBinding>) deletedCaptor.capture()))
.thenReturn(Mono.empty());
aclsService.syncAclWithAclCsv(
CLUSTER,
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
+ "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
).block();
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(1)
.contains(newBindingToBeAdded);
Collection<AclBinding> deletedBindings = (Collection<AclBinding>) deletedCaptor.getValue();
assertThat(deletedBindings)
.hasSize(1)
.contains(existingBinding2);
}
}

View file

@ -0,0 +1,53 @@
package com.provectus.kafka.ui.service.masking.policies;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.List;
import org.junit.jupiter.api.Test;
class FieldsSelectorTest {
@Test
void selectsFieldsDueToProvidedPattern() {
var properties = new ClustersProperties.Masking();
properties.setFieldsNamePattern("f1|f2");
var selector = FieldsSelector.create(properties);
assertThat(selector.shouldBeMasked("f1")).isTrue();
assertThat(selector.shouldBeMasked("f2")).isTrue();
assertThat(selector.shouldBeMasked("doesNotMatchPattern")).isFalse();
}
@Test
void selectsFieldsDueToProvidedFieldNames() {
var properties = new ClustersProperties.Masking();
properties.setFields(List.of("f1", "f2"));
var selector = FieldsSelector.create(properties);
assertThat(selector.shouldBeMasked("f1")).isTrue();
assertThat(selector.shouldBeMasked("f2")).isTrue();
assertThat(selector.shouldBeMasked("notInAList")).isFalse();
}
@Test
void selectAllFieldsIfNoPatternAndNoNamesProvided() {
var properties = new ClustersProperties.Masking();
var selector = FieldsSelector.create(properties);
assertThat(selector.shouldBeMasked("anyPropertyName")).isTrue();
}
@Test
void throwsExceptionIfBothFieldListAndPatternProvided() {
var properties = new ClustersProperties.Masking();
properties.setFieldsNamePattern("f1|f2");
properties.setFields(List.of("f3", "f4"));
assertThatThrownBy(() -> FieldsSelector.create(properties))
.isInstanceOf(ValidationException.class);
}
}

View file

@ -15,35 +15,35 @@ import org.junit.jupiter.params.provider.MethodSource;
class MaskTest {
private static final List<String> TARGET_FIELDS = List.of("id", "name");
private static final FieldsSelector FIELDS_SELECTOR = fieldName -> List.of("id", "name").contains(fieldName);
private static final List<String> PATTERN = List.of("X", "x", "n", "-");
@ParameterizedTest
@MethodSource
void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) {
Mask policy = new Mask(fields, PATTERN);
void testApplyToJsonContainer(FieldsSelector selector, ContainerNode<?> original, ContainerNode<?> expected) {
Mask policy = new Mask(selector, PATTERN);
assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
}
private static Stream<Arguments> testApplyToJsonContainer() {
return Stream.of(
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
parse("{ \"id\": \"nnn\", \"name\": { \"first\": \"Xxxxx\", \"surname\": \"Xxxxnnn-\"}}")
),
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
parse("[{ \"id\": \"nnn\", \"f2\": 234}, { \"name\": \"n-n\", \"f2\": 345} ]")
),
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Xxxxnnn-\"}}")
),
Arguments.of(
List.of(),
(FieldsSelector) (fieldName -> true),
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"Xxxxx\", \"name\": \"Xxxxnnn-\"}}")
)
@ -57,7 +57,7 @@ class MaskTest {
"null, xxxx"
})
void testApplyToString(String original, String expected) {
Mask policy = new Mask(List.of(), PATTERN);
Mask policy = new Mask(fieldName -> true, PATTERN);
assertThat(policy.applyToString(original)).isEqualTo(expected);
}

View file

@ -15,39 +15,39 @@ import org.junit.jupiter.params.provider.MethodSource;
class RemoveTest {
private static final List<String> TARGET_FIELDS = List.of("id", "name");
private static final FieldsSelector FIELDS_SELECTOR = fieldName -> List.of("id", "name").contains(fieldName);
@ParameterizedTest
@MethodSource
void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) {
var policy = new Remove(fields);
void testApplyToJsonContainer(FieldsSelector fieldsSelector, ContainerNode<?> original, ContainerNode<?> expected) {
var policy = new Remove(fieldsSelector);
assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
}
private static Stream<Arguments> testApplyToJsonContainer() {
return Stream.of(
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
parse("{}")
),
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
parse("[{ \"f2\": 234}, { \"f2\": 345} ]")
),
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"James\"}}")
),
Arguments.of(
List.of(),
(FieldsSelector) (fieldName -> true),
parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"),
parse("{}")
),
Arguments.of(
List.of(),
(FieldsSelector) (fieldName -> true),
parse("[{ \"f1\": 123}, { \"f2\": \"1.2\"} ]"),
parse("[{}, {}]")
)
@ -66,7 +66,7 @@ class RemoveTest {
"null, null"
})
void testApplyToString(String original, String expected) {
var policy = new Remove(List.of());
var policy = new Remove(fieldName -> true);
assertThat(policy.applyToString(original)).isEqualTo(expected);
}
}
}

View file

@ -15,35 +15,35 @@ import org.junit.jupiter.params.provider.MethodSource;
class ReplaceTest {
private static final List<String> TARGET_FIELDS = List.of("id", "name");
private static final FieldsSelector FIELDS_SELECTOR = fieldName -> List.of("id", "name").contains(fieldName);
private static final String REPLACEMENT_STRING = "***";
@ParameterizedTest
@MethodSource
void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) {
var policy = new Replace(fields, REPLACEMENT_STRING);
void testApplyToJsonContainer(FieldsSelector fieldsSelector, ContainerNode<?> original, ContainerNode<?> expected) {
var policy = new Replace(fieldsSelector, REPLACEMENT_STRING);
assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
}
private static Stream<Arguments> testApplyToJsonContainer() {
return Stream.of(
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
parse("{ \"id\": \"***\", \"name\": { \"first\": \"***\", \"surname\": \"***\"}}")
),
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
parse("[{ \"id\": \"***\", \"f2\": 234}, { \"name\": \"***\", \"f2\": 345} ]")
),
Arguments.of(
TARGET_FIELDS,
FIELDS_SELECTOR,
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"***\"}}")
),
Arguments.of(
List.of(),
(FieldsSelector) (fieldName -> true),
parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"),
parse("{ \"outer\": { \"f1\": \"***\", \"f2\": \"***\", \"inner\" : {\"if1\": \"***\"}}}}")
)
@ -62,7 +62,7 @@ class ReplaceTest {
"null, ***"
})
void testApplyToString(String original, String expected) {
var policy = new Replace(List.of(), REPLACEMENT_STRING);
var policy = new Replace(fieldName -> true, REPLACEMENT_STRING);
assertThat(policy.applyToString(original)).isEqualTo(expected);
}
}
}

View file

@ -59,8 +59,10 @@ class ProtobufSchemaConverterTest {
TestMsg outer_ref = 2;
EmbeddedMsg self_ref = 3;
}
}""";
map<int32, string> intToStringMap = 21;
map<string, EmbeddedMsg> strToObjMap = 22;
}""";
String expectedJsonSchema = """
{
@ -109,7 +111,9 @@ class ProtobufSchemaConverterTest {
"v2": { "type": [ "number", "string", "object", "array", "boolean", "null" ] },
"uint32_w_field": { "type": "integer", "maximum": 4294967295, "minimum": 0 },
"bool_w_field": { "type": "boolean" },
"uint64_w_field": { "type": "integer", "maximum": 18446744073709551615, "minimum": 0 }
"uint64_w_field": { "type": "integer", "maximum": 18446744073709551615, "minimum": 0 },
"strToObjMap": { "type": "object", "additionalProperties": true },
"intToStringMap": { "type": "object", "additionalProperties": true }
}
},
"test.TestMsg.EmbeddedMsg": {

View file

@ -0,0 +1 @@
some content goes here

View file

@ -101,9 +101,6 @@
<useSpringBoot3>true</useSpringBoot3>
<dateLibrary>java8</dateLibrary>
</configOptions>
<typeMappings>
<mapping>filepart=org.springframework.http.codec.multipart.FilePart</mapping>
</typeMappings>
</configuration>
</execution>
<execution>

View file

@ -1730,6 +1730,125 @@ paths:
404:
description: Not found
/api/clusters/{clusterName}/acls:
get:
tags:
- Acls
summary: listKafkaAcls
operationId: listAcls
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: resourceType
in: query
required: false
schema:
$ref: '#/components/schemas/KafkaAclResourceType'
- name: resourceName
in: query
required: false
schema:
type: string
- name: namePatternType
in: query
required: false
schema:
$ref: '#/components/schemas/KafkaAclNamePatternType'
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/KafkaAcl'
/api/clusters/{clusterName}/acl/csv:
get:
tags:
- Acls
summary: getAclAsCsv
operationId: getAclAsCsv
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
text/plain:
schema:
type: string
post:
tags:
- Acls
summary: syncAclsCsv
operationId: syncAclsCsv
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
text/plain:
schema:
type: string
responses:
200:
description: OK
/api/clusters/{clusterName}/acl:
post:
tags:
- Acls
summary: createAcl
operationId: createAcl
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/KafkaAcl'
responses:
200:
description: OK
delete:
tags:
- Acls
summary: deleteAcl
operationId: deleteAcl
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/KafkaAcl'
responses:
200:
description: OK
404:
description: Acl not found
/api/authorization:
get:
tags:
@ -1819,7 +1938,7 @@ paths:
properties:
file:
type: string
format: filepart
format: binary
responses:
200:
description: OK
@ -1972,6 +2091,8 @@ components:
- KAFKA_CONNECT
- KSQL_DB
- TOPIC_DELETION
- KAFKA_ACL_VIEW # get ACLs listing
- KAFKA_ACL_EDIT # create & delete ACLs
required:
- id
- name
@ -2375,6 +2496,16 @@ components:
type: number
bytesOutPerSec:
type: number
partitionsLeader:
type: integer
partitions:
type: integer
inSyncPartitions:
type: integer
partitionsSkew:
type: number
leadersSkew:
type: number
required:
- id
@ -2441,6 +2572,7 @@ components:
- MEMBERS
- STATE
- MESSAGES_BEHIND
- TOPIC_NUM
ConsumerGroupsPageResponse:
type: object
@ -3331,6 +3463,62 @@ components:
- SCHEMA
- CONNECT
- KSQL
- ACL
KafkaAcl:
type: object
required: [resourceType, resourceName, namePatternType, principal, host, operation, permission]
properties:
resourceType:
$ref: '#/components/schemas/KafkaAclResourceType'
resourceName:
type: string # "*" if acl can be applied to any resource of given type
namePatternType:
$ref: '#/components/schemas/KafkaAclNamePatternType'
principal:
type: string
host:
type: string # "*" if acl can be applied to any resource of given type
operation:
type: string
enum:
- UNKNOWN # Unknown operation, need to update mapping code on BE
- ALL # Cluster, Topic, Group
- READ # Topic, Group
- WRITE # Topic, TransactionalId
- CREATE # Cluster, Topic
- DELETE # Topic, Group
- ALTER # Cluster, Topic,
- DESCRIBE # Cluster, Topic, Group, TransactionalId, DelegationToken
- CLUSTER_ACTION # Cluster
- DESCRIBE_CONFIGS # Cluster, Topic
- ALTER_CONFIGS # Cluster, Topic
- IDEMPOTENT_WRITE # Cluster
- CREATE_TOKENS
- DESCRIBE_TOKENS
permission:
type: string
enum:
- ALLOW
- DENY
KafkaAclResourceType:
type: string
enum:
- UNKNOWN # Unknown operation, need to update mapping code on BE
- TOPIC
- GROUP
- CLUSTER
- TRANSACTIONAL_ID
- DELEGATION_TOKEN
- USER
KafkaAclNamePatternType:
type: string
enum:
- MATCH
- LITERAL
- PREFIXED
RestartRequest:
type: object
@ -3467,6 +3655,12 @@ components:
type: array
items:
$ref: '#/components/schemas/Action'
webclient:
type: object
properties:
maxInMemoryBufferSize:
type: string
description: "examples: 20, 12KB, 5MB"
kafka:
type: object
properties:
@ -3479,6 +3673,14 @@ components:
type: integer
noDataEmptyPolls:
type: integer
maxPageSize:
type: integer
defaultPageSize:
type: integer
adminClientTimeout:
type: integer
internalTopicPrefix:
type: string
clusters:
type: array
items:
@ -3607,7 +3809,9 @@ components:
type: array
items:
type: string
pattern:
fieldsNamePattern:
type: string
maskingCharsReplacement:
type: array
items:
type: string

View file

@ -36,29 +36,31 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
@Slf4j
public class ApiService extends BaseSource {
private final ApiClient apiClient = new ApiClient().setBasePath(BASE_API_URL);
@SneakyThrows
private TopicsApi topicApi() {
return new TopicsApi(new ApiClient().setBasePath(BASE_API_URL));
return new TopicsApi(apiClient);
}
@SneakyThrows
private SchemasApi schemaApi() {
return new SchemasApi(new ApiClient().setBasePath(BASE_API_URL));
return new SchemasApi(apiClient);
}
@SneakyThrows
private KafkaConnectApi connectorApi() {
return new KafkaConnectApi(new ApiClient().setBasePath(BASE_API_URL));
return new KafkaConnectApi(apiClient);
}
@SneakyThrows
private MessagesApi messageApi() {
return new MessagesApi(new ApiClient().setBasePath(BASE_API_URL));
return new MessagesApi(apiClient);
}
@SneakyThrows
private KsqlApi ksqlApi() {
return new KsqlApi(new ApiClient().setBasePath(BASE_API_URL));
return new KsqlApi(apiClient);
}
@SneakyThrows

View file

@ -1,6 +1,8 @@
package com.provectus.kafka.ui.manualsuite.backlog;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.KSQL_DB_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.SCHEMAS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID;
import static com.provectus.kafka.ui.utilities.qase.enums.State.TO_BE_AUTOMATED;
@ -46,4 +48,39 @@ public class SmokeBacklog extends BaseManualTest {
@Test
public void testCaseE() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(343)
@Test
public void testCaseF() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = KSQL_DB_SUITE_ID)
@QaseId(344)
@Test
public void testCaseG() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(345)
@Test
public void testCaseH() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(346)
@Test
public void testCaseI() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(347)
@Test
public void testCaseJ() {
}
}

View file

@ -92,4 +92,28 @@ public class TopicsTest extends BaseManualTest {
@Test
public void testCaseN() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(337)
@Test
public void testCaseO() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(339)
@Test
public void testCaseP() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(341)
@Test
public void testCaseQ() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(342)
@Test
public void testCaseR() {
}
}

View file

@ -14,4 +14,16 @@ public class WizardTest extends BaseManualTest {
@Test
public void testCaseA() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(338)
@Test
public void testCaseB() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(340)
@Test
public void testCaseC() {
}
}

View file

@ -486,11 +486,7 @@ public class TopicsTest extends BaseTest {
topicDetails
.waitUntilScreenReady();
TOPIC_LIST.add(topicToCopy);
SoftAssert softly = new SoftAssert();
softly.assertTrue(topicDetails.isAlertWithMessageVisible(SUCCESS, "Topic successfully created."),
"isAlertWithMessageVisible()");
softly.assertTrue(topicDetails.isTopicHeaderVisible(topicToCopy.getName()), "isTopicHeaderVisible()");
softly.assertAll();
Assert.assertTrue(topicDetails.isTopicHeaderVisible(topicToCopy.getName()), "isTopicHeaderVisible()");
}
@AfterClass(alwaysRun = true)

View file

@ -37,7 +37,7 @@ const Config: React.FC = () => {
formState: { isDirty, isSubmitting, isValid, errors },
setValue,
} = useForm<FormValues>({
mode: 'onTouched',
mode: 'onChange',
resolver: yupResolver(validationSchema),
defaultValues: {
config: JSON.stringify(config, null, '\t'),

Some files were not shown because too many files have changed in this diff Show more