Merge branch 'master' into issues/stopQueryFunctionalCheck3

# Conflicts:
#	kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/manualsuite/backlog/SmokeBacklog.java
#	kafka-ui-e2e-checks/src/test/java/com/provectus/kafka/ui/smokesuite/ksqldb/KsqlDbTest.java
This commit is contained in:
VladSenyuta 2023-05-02 09:45:49 +03:00
commit 80fd6631a7
106 changed files with 1581 additions and 852 deletions

92
.github/ISSUE_TEMPLATE/bug.yml vendored Normal file
View file

@ -0,0 +1,92 @@
name: "\U0001F41E Bug report"
description: File a bug report
labels: ["status/triage", "type/bug"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version, if you use one
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,64 +0,0 @@
---
name: "\U0001F41E Bug report"
about: Create a bug report
title: ''
labels: status/triage, type/bug
assignees: ''
---
<!--
We will close the issue without further explanation if you don't follow this template and don't provide the information requested within this template.
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
<!--
Please follow the naming conventions for bugs:
<Feature/Area/Scope> : <Compact, but specific problem summary>
Avoid generic titles, like “Topics: incorrect layout of message sorting drop-down list”. Better use something like: “Topics: Message sorting drop-down list overlaps the "Submit" button”.
-->
**Describe the bug** (Actual behavior)
<!--(A clear and concise description of what the bug is.Use a list, if there is more than one problem)-->
**Expected behavior**
<!--(A clear and concise description of what you expected to happen.)-->
**Set up**
<!--
WE MIGHT CLOSE THE ISSUE without further explanation IF YOU DON'T PROVIDE THIS INFORMATION.
How do you run the app? Please provide as much info as possible:
1. App version (docker image version or check commit hash in the top left corner in UI)
2. Helm chart version, if you use one
3. Any IAAC configs
-->
**Steps to Reproduce**
<!-- We'd like you to provide an example setup (via docker-compose, helm, etc.)
to reproduce the problem, especially with a complex setups. -->
1.
**Screenshots**
<!--
(If applicable, add screenshots to help explain your problem)
-->
**Additional context**
<!--
Add any other context about the problem here. E.g.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successfull or same issue occured? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
-->

11
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View file

@ -0,0 +1,11 @@
blank_issues_enabled: false
contact_links:
- name: Official documentation
url: https://docs.kafka-ui.provectus.io/
about: Before reaching out for support, please refer to our documentation. Read "FAQ" and "Common problems", also try using search there.
- name: Community Discord
url: https://discord.gg/4DWzD7pGE5
about: Chat with other users, get some support or ask questions.
- name: GitHub Discussions
url: https://github.com/provectus/kafka-ui/discussions
about: An alternative place to ask questions or to get some support.

66
.github/ISSUE_TEMPLATE/feature.yml vendored Normal file
View file

@ -0,0 +1,66 @@
name: "\U0001F680 Feature request"
description: Propose a new feature
labels: ["status/triage", "type/feature"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md) and the feature is not present there
required: true
- type: textarea
attributes:
label: Is your proposal related to a problem?
description: |
Provide a clear and concise description of what the problem is.
For example, "I'm always frustrated when..."
validations:
required: false
- type: textarea
attributes:
label: Describe the feature you're interested in
description: |
Provide a clear and concise description of what you want to happen.
validations:
required: true
- type: textarea
attributes:
label: Describe alternatives you've considered
description: |
Let us know about other solutions you've tried or researched.
validations:
required: false
- type: input
attributes:
label: Version you're running
description: |
Please provide the app version you're currently running:
1. App version (commit hash in the top left corner of the UI)
validations:
required: true
- type: textarea
attributes:
label: Additional context
description: |
Is there anything else you can add about the proposal?
You might want to link to related issues here, if you haven't already.
validations:
required: false

View file

@ -1,46 +0,0 @@
---
name: "\U0001F680 Feature request"
about: Propose a new feature
title: ''
labels: status/triage, type/feature
assignees: ''
---
<!--
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
### Which version of the app are you running?
<!-- Please provide docker image version or check commit hash in the top left corner in UI) -->
### Is your proposal related to a problem?
<!--
Provide a clear and concise description of what the problem is.
For example, "I'm always frustrated when..."
-->
### Describe the solution you'd like
<!--
Provide a clear and concise description of what you want to happen.
-->
### Describe alternatives you've considered
<!--
Let us know about other solutions you've tried or researched.
-->
### Additional context
<!--
Is there anything else you can add about the proposal?
You might want to link to related issues here, if you haven't already.
-->

92
.github/ISSUE_TEMPLATE/helm.yml vendored Normal file
View file

@ -0,0 +1,92 @@
name: "⎈ K8s/Helm problem report"
description: "Report a problem with k8s/helm charts/etc"
labels: ["status/triage", "scope/k8s"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,52 +0,0 @@
---
name: "⎈ K8s/Helm problem report"
about: Report a problem with k8s/helm charts/etc
title: ''
labels: scope/k8s, status/triage
assignees: azatsafin
---
<!--
Don't forget to check for existing issues/discussions regarding your proposal. We might already have it.
https://github.com/provectus/kafka-ui/issues
https://github.com/provectus/kafka-ui/discussions
-->
**Describe the bug**
<!--(A clear and concise description of what the bug is.)-->
**Set up**
<!--
How do you run the app? Please provide as much info as possible:
1. App version (docker image version or check commit hash in the top left corner in UI)
2. Helm chart version, if you use one
3. Any IAAC configs
We might close the issue without further explanation if you don't provide such information.
-->
**Steps to Reproduce**
Steps to reproduce the behavior:
1.
**Expected behavior**
<!--
(A clear and concise description of what you expected to happen)
-->
**Screenshots**
<!--
(If applicable, add screenshots to help explain your problem)
-->
**Additional context**
<!--
(Add any other context about the problem here)
-->

View file

@ -1,16 +0,0 @@
---
name: "❓ Question"
about: Ask a question
title: ''
---
<!--
To ask a question, please either:
1. Open up a discussion (https://github.com/provectus/kafka-ui/discussions)
2. Join us on discord (https://discord.gg/4DWzD7pGE5) and ask there.
Don't forget to check/search for existing issues/discussions.
-->

View file

@ -8,8 +8,6 @@ updates:
timezone: Europe/Moscow timezone: Europe/Moscow
reviewers: reviewers:
- "Haarolean" - "Haarolean"
assignees:
- "Haarolean"
labels: labels:
- "scope/backend" - "scope/backend"
- "type/dependencies" - "type/dependencies"
@ -99,8 +97,6 @@ updates:
timezone: Europe/Moscow timezone: Europe/Moscow
reviewers: reviewers:
- "Haarolean" - "Haarolean"
assignees:
- "Haarolean"
labels: labels:
- "scope/infrastructure" - "scope/infrastructure"
- "type/dependencies" - "type/dependencies"

View file

@ -6,7 +6,7 @@ jobs:
block_merge: block_merge:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: mheap/github-action-required-labels@v3 - uses: mheap/github-action-required-labels@v4
with: with:
mode: exactly mode: exactly
count: 0 count: 0

View file

@ -86,7 +86,7 @@ jobs:
- name: make comment with private deployment link - name: make comment with private deployment link
if: ${{ github.event.label.name == 'status/feature_testing' }} if: ${{ github.event.label.name == 'status/feature_testing' }}
uses: peter-evans/create-or-update-comment@v2 uses: peter-evans/create-or-update-comment@v3
with: with:
issue-number: ${{ github.event.pull_request.number }} issue-number: ${{ github.event.pull_request.number }}
body: | body: |
@ -94,7 +94,7 @@ jobs:
- name: make comment with public deployment link - name: make comment with public deployment link
if: ${{ github.event.label.name == 'status/feature_testing_public' }} if: ${{ github.event.label.name == 'status/feature_testing_public' }}
uses: peter-evans/create-or-update-comment@v2 uses: peter-evans/create-or-update-comment@v3
with: with:
issue-number: ${{ github.event.pull_request.number }} issue-number: ${{ github.event.pull_request.number }}
body: | body: |

View file

@ -21,7 +21,7 @@ jobs:
git add ../kafka-ui-from-branch/ git add ../kafka-ui-from-branch/
git commit -m "removed env:${{ needs.build.outputs.deploy }}" && git push || true git commit -m "removed env:${{ needs.build.outputs.deploy }}" && git push || true
- name: make comment with deployment link - name: make comment with deployment link
uses: peter-evans/create-or-update-comment@v2 uses: peter-evans/create-or-update-comment@v3
with: with:
issue-number: ${{ github.event.pull_request.number }} issue-number: ${{ github.event.pull_request.number }}
body: | body: |

View file

@ -65,7 +65,7 @@ jobs:
cache-from: type=local,src=/tmp/.buildx-cache cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache cache-to: type=local,dest=/tmp/.buildx-cache
- name: make comment with private deployment link - name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2 uses: peter-evans/create-or-update-comment@v3
with: with:
issue-number: ${{ github.event.pull_request.number }} issue-number: ${{ github.event.pull_request.number }}
body: | body: |

View file

@ -55,7 +55,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache cache-to: type=local,dest=/tmp/.buildx-cache
- name: Run CVE checks - name: Run CVE checks
uses: aquasecurity/trivy-action@0.9.2 uses: aquasecurity/trivy-action@0.10.0
with: with:
image-ref: "provectuslabs/kafka-ui:${{ steps.build.outputs.version }}" image-ref: "provectuslabs/kafka-ui:${{ steps.build.outputs.version }}"
format: "table" format: "table"

View file

@ -33,7 +33,7 @@ jobs:
--image-ids imageTag=${{ steps.extract_branch.outputs.tag }} \ --image-ids imageTag=${{ steps.extract_branch.outputs.tag }} \
--region us-east-1 --region us-east-1
- name: make comment with private deployment link - name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2 uses: peter-evans/create-or-update-comment@v3
with: with:
issue-number: ${{ github.event.pull_request.number }} issue-number: ${{ github.event.pull_request.number }}
body: | body: |

View file

@ -7,7 +7,7 @@ jobs:
stale: stale:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/stale@v7 - uses: actions/stale@v8
with: with:
days-before-issue-stale: 7 days-before-issue-stale: 7
days-before-issue-close: 3 days-before-issue-close: 3

View file

@ -6,7 +6,8 @@ Following versions of the project are currently being supported with security up
| Version | Supported | | Version | Supported |
| ------- | ------------------ | | ------- | ------------------ |
| 0.5.x | :white_check_mark: | | 0.6.x | :white_check_mark: |
| 0.5.x | :x: |
| 0.4.x | :x: | | 0.4.x | :x: |
| 0.3.x | :x: | | 0.3.x | :x: |
| 0.2.x | :x: | | 0.2.x | :x: |

View file

@ -2,6 +2,6 @@ apiVersion: v2
name: kafka-ui name: kafka-ui
description: A Helm chart for kafka-UI description: A Helm chart for kafka-UI
type: application type: application
version: 0.6.1 version: 0.6.2
appVersion: v0.6.1 appVersion: v0.6.2
icon: https://github.com/provectus/kafka-ui/raw/master/documentation/images/kafka-ui-logo.png icon: https://github.com/provectus/kafka-ui/raw/master/documentation/images/kafka-ui-logo.png

View file

@ -9,4 +9,6 @@ message MySpecificTopicValue {
message MyValue { message MyValue {
int32 version = 1; int32 version = 1;
string payload = 2; string payload = 2;
map<int32, string> intToStringMap = 3;
map<string, MyValue> strToObjMap = 4;
} }

View file

@ -21,6 +21,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<!--TODO: remove, when spring-boot fixed dependency to 6.0.8+ (6.0.7 has CVE) -->
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>6.0.8</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId> <artifactId>spring-boot-starter-webflux</artifactId>
@ -109,6 +115,12 @@
<groupId>io.projectreactor.addons</groupId> <groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-extra</artifactId> <artifactId>reactor-extra</artifactId>
</dependency> </dependency>
<!-- https://github.com/provectus/kafka-ui/pull/3693 -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View file

@ -27,6 +27,8 @@ public class ClustersProperties {
String internalTopicPrefix; String internalTopicPrefix;
Integer adminClientTimeout;
PollingProperties polling = new PollingProperties(); PollingProperties polling = new PollingProperties();
@Data @Data
@ -56,6 +58,8 @@ public class ClustersProperties {
Integer pollTimeoutMs; Integer pollTimeoutMs;
Integer partitionPollTimeout; Integer partitionPollTimeout;
Integer noDataEmptyPolls; Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
} }
@Data @Data
@ -127,8 +131,9 @@ public class ClustersProperties {
@Data @Data
public static class Masking { public static class Masking {
Type type; Type type;
List<String> fields; //if null or empty list - policy will be applied to all fields List<String> fields;
List<String> pattern; //used when type=MASK String fieldsNamePattern;
List<String> maskingCharsReplacement; //used when type=MASK
String replacement; //used when type=REPLACE String replacement; //used when type=REPLACE
String topicKeysPattern; String topicKeysPattern;
String topicValuesPattern; String topicValuesPattern;

View file

@ -5,7 +5,6 @@ import java.util.Map;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.openapitools.jackson.nullable.JsonNullableModule; import org.openapitools.jackson.nullable.JsonNullableModule;
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties; import org.springframework.boot.autoconfigure.web.reactive.WebFluxProperties;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
@ -15,8 +14,6 @@ import org.springframework.http.server.reactive.ContextPathCompositeHandler;
import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.jmx.export.MBeanExporter; import org.springframework.jmx.export.MBeanExporter;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@Configuration @Configuration
@ -52,14 +49,7 @@ public class Config {
} }
@Bean @Bean
public WebClient webClient( // will be used by webflux json mapping
@Value("${webclient.max-in-memory-buffer-size:20MB}") DataSize maxBuffSize) {
return WebClient.builder()
.codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
.build();
}
@Bean
public JsonNullableModule jsonNullableModule() { public JsonNullableModule jsonNullableModule() {
return new JsonNullableModule(); return new JsonNullableModule();
} }

View file

@ -0,0 +1,33 @@
package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.exception.ValidationException;
import java.beans.Transient;
import javax.annotation.PostConstruct;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.unit.DataSize;
@Configuration
@ConfigurationProperties("webclient")
@Data
public class WebclientProperties {
String maxInMemoryBufferSize;
@PostConstruct
public void validate() {
validateAndSetDefaultBufferSize();
}
private void validateAndSetDefaultBufferSize() {
if (maxInMemoryBufferSize != null) {
try {
DataSize.parse(maxInMemoryBufferSize);
} catch (Exception e) {
throw new ValidationException("Invalid format for webclient.maxInMemoryBufferSize");
}
}
}
}

View file

@ -0,0 +1,26 @@
package com.provectus.kafka.ui.config.auth;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("spring.ldap")
@Data
public class LdapProperties {
private String urls;
private String base;
private String adminUser;
private String adminPassword;
private String userFilterSearchBase;
private String userFilterSearchFilter;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;
@Value("${oauth2.ldap.aсtiveDirectory.domain:@null}")
private String activeDirectoryDomain;
@Value("${oauth2.ldap.groupRoleAttribute:cn}")
private String groupRoleAttribute;
}

View file

@ -1,13 +1,23 @@
package com.provectus.kafka.ui.config.auth; package com.provectus.kafka.ui.config.auth;
import static com.provectus.kafka.ui.config.auth.AbstractAuthSecurityConfig.AUTH_WHITELIST;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.service.rbac.extractor.RbacLdapAuthoritiesExtractor;
import java.util.Collection;
import java.util.List; import java.util.List;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration; import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.ldap.core.DirContextOperations;
import org.springframework.ldap.core.support.BaseLdapPathContextSource; import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.ldap.core.support.LdapContextSource; import org.springframework.ldap.core.support.LdapContextSource;
import org.springframework.security.authentication.AuthenticationManager; import org.springframework.security.authentication.AuthenticationManager;
@ -16,70 +26,71 @@ import org.springframework.security.authentication.ReactiveAuthenticationManager
import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter; import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity; import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.ldap.authentication.AbstractLdapAuthenticationProvider; import org.springframework.security.ldap.authentication.AbstractLdapAuthenticationProvider;
import org.springframework.security.ldap.authentication.BindAuthenticator; import org.springframework.security.ldap.authentication.BindAuthenticator;
import org.springframework.security.ldap.authentication.LdapAuthenticationProvider; import org.springframework.security.ldap.authentication.LdapAuthenticationProvider;
import org.springframework.security.ldap.authentication.ad.ActiveDirectoryLdapAuthenticationProvider; import org.springframework.security.ldap.authentication.ad.ActiveDirectoryLdapAuthenticationProvider;
import org.springframework.security.ldap.search.FilterBasedLdapUserSearch; import org.springframework.security.ldap.search.FilterBasedLdapUserSearch;
import org.springframework.security.ldap.search.LdapUserSearch; import org.springframework.security.ldap.search.LdapUserSearch;
import org.springframework.security.ldap.userdetails.LdapUserDetailsMapper;
import org.springframework.security.web.server.SecurityWebFilterChain; import org.springframework.security.web.server.SecurityWebFilterChain;
@Configuration @Configuration
@EnableWebFluxSecurity @EnableWebFluxSecurity
@ConditionalOnProperty(value = "auth.type", havingValue = "LDAP") @ConditionalOnProperty(value = "auth.type", havingValue = "LDAP")
@Import(LdapAutoConfiguration.class) @Import(LdapAutoConfiguration.class)
@EnableConfigurationProperties(LdapProperties.class)
@RequiredArgsConstructor
@Slf4j @Slf4j
public class LdapSecurityConfig extends AbstractAuthSecurityConfig { public class LdapSecurityConfig {
@Value("${spring.ldap.urls}") private final LdapProperties props;
private String ldapUrls;
@Value("${spring.ldap.dn.pattern:#{null}}")
private String ldapUserDnPattern;
@Value("${spring.ldap.adminUser:#{null}}")
private String adminUser;
@Value("${spring.ldap.adminPassword:#{null}}")
private String adminPassword;
@Value("${spring.ldap.userFilter.searchBase:#{null}}")
private String userFilterSearchBase;
@Value("${spring.ldap.userFilter.searchFilter:#{null}}")
private String userFilterSearchFilter;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;
@Value("${oauth2.ldap.aсtiveDirectory.domain:#{null}}")
private String activeDirectoryDomain;
@Bean @Bean
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource) { public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource,
ApplicationContext context,
@Nullable AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
BindAuthenticator ba = new BindAuthenticator(contextSource); BindAuthenticator ba = new BindAuthenticator(contextSource);
if (ldapUserDnPattern != null) { if (props.getBase() != null) {
ba.setUserDnPatterns(new String[] {ldapUserDnPattern}); ba.setUserDnPatterns(new String[] {props.getBase()});
} }
if (userFilterSearchFilter != null) { if (props.getUserFilterSearchFilter() != null) {
LdapUserSearch userSearch = LdapUserSearch userSearch =
new FilterBasedLdapUserSearch(userFilterSearchBase, userFilterSearchFilter, contextSource); new FilterBasedLdapUserSearch(props.getUserFilterSearchBase(), props.getUserFilterSearchFilter(),
contextSource);
ba.setUserSearch(userSearch); ba.setUserSearch(userSearch);
} }
AbstractLdapAuthenticationProvider authenticationProvider; AbstractLdapAuthenticationProvider authenticationProvider;
if (!isActiveDirectory) { if (!props.isActiveDirectory()) {
authenticationProvider = new LdapAuthenticationProvider(ba); authenticationProvider = rbacEnabled
? new LdapAuthenticationProvider(ba, new RbacLdapAuthoritiesExtractor(context))
: new LdapAuthenticationProvider(ba);
} else { } else {
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(activeDirectoryDomain, ldapUrls); authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(props.getActiveDirectoryDomain(),
props.getUrls()); // TODO Issue #3741
authenticationProvider.setUseAuthenticationRequestCredentials(true); authenticationProvider.setUseAuthenticationRequestCredentials(true);
} }
if (rbacEnabled) {
authenticationProvider.setUserDetailsContextMapper(new UserDetailsMapper());
}
AuthenticationManager am = new ProviderManager(List.of(authenticationProvider)); AuthenticationManager am = new ProviderManager(List.of(authenticationProvider));
return new ReactiveAuthenticationManagerAdapter(am); return new ReactiveAuthenticationManagerAdapter(am);
} }
@Bean @Bean
@Primary
public BaseLdapPathContextSource contextSource() { public BaseLdapPathContextSource contextSource() {
LdapContextSource ctx = new LdapContextSource(); LdapContextSource ctx = new LdapContextSource();
ctx.setUrl(ldapUrls); ctx.setUrl(props.getUrls());
ctx.setUserDn(adminUser); ctx.setUserDn(props.getAdminUser());
ctx.setPassword(adminPassword); ctx.setPassword(props.getAdminPassword());
ctx.afterPropertiesSet(); ctx.afterPropertiesSet();
return ctx; return ctx;
} }
@ -87,20 +98,35 @@ public class LdapSecurityConfig extends AbstractAuthSecurityConfig {
@Bean @Bean
public SecurityWebFilterChain configureLdap(ServerHttpSecurity http) { public SecurityWebFilterChain configureLdap(ServerHttpSecurity http) {
log.info("Configuring LDAP authentication."); log.info("Configuring LDAP authentication.");
if (isActiveDirectory) { if (props.isActiveDirectory()) {
log.info("Active Directory support for LDAP has been enabled."); log.info("Active Directory support for LDAP has been enabled.");
} }
http return http
.authorizeExchange() .authorizeExchange()
.pathMatchers(AUTH_WHITELIST) .pathMatchers(AUTH_WHITELIST)
.permitAll() .permitAll()
.anyExchange() .anyExchange()
.authenticated() .authenticated()
.and() .and()
.httpBasic(); .formLogin()
return http.csrf().disable().build(); .and()
.logout()
.and()
.csrf().disable()
.build();
}
private static class UserDetailsMapper extends LdapUserDetailsMapper {
@Override
public UserDetails mapUserFromContext(DirContextOperations ctx, String username,
Collection<? extends GrantedAuthority> authorities) {
UserDetails userDetails = super.mapUserFromContext(ctx, username, authorities);
return new RbacLdapUser(userDetails);
}
} }
} }

View file

@ -115,7 +115,7 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
@Nullable @Nullable
private ProviderAuthorityExtractor getExtractor(final String providerId, AccessControlService acs) { private ProviderAuthorityExtractor getExtractor(final String providerId, AccessControlService acs) {
final String provider = getProviderByProviderId(providerId); final String provider = getProviderByProviderId(providerId);
Optional<ProviderAuthorityExtractor> extractor = acs.getExtractors() Optional<ProviderAuthorityExtractor> extractor = acs.getOauthExtractors()
.stream() .stream()
.filter(e -> e.isApplicable(provider)) .filter(e -> e.isApplicable(provider))
.findFirst(); .findFirst();

View file

@ -0,0 +1,60 @@
package com.provectus.kafka.ui.config.auth;
import java.util.Collection;
import java.util.stream.Collectors;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.userdetails.UserDetails;
public class RbacLdapUser implements UserDetails, RbacUser {
private final UserDetails userDetails;
public RbacLdapUser(UserDetails userDetails) {
this.userDetails = userDetails;
}
@Override
public String name() {
return userDetails.getUsername();
}
@Override
public Collection<String> groups() {
return userDetails.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
}
@Override
public Collection<? extends GrantedAuthority> getAuthorities() {
return userDetails.getAuthorities();
}
@Override
public String getPassword() {
return userDetails.getPassword();
}
@Override
public String getUsername() {
return userDetails.getUsername();
}
@Override
public boolean isAccountNonExpired() {
return userDetails.isAccountNonExpired();
}
@Override
public boolean isAccountNonLocked() {
return userDetails.isAccountNonLocked();
}
@Override
public boolean isCredentialsNonExpired() {
return userDetails.isCredentialsNonExpired();
}
@Override
public boolean isEnabled() {
return userDetails.isEnabled();
}
}

View file

@ -0,0 +1,21 @@
package com.provectus.kafka.ui.config.auth.condition;
import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
public class ActiveDirectoryCondition extends AllNestedConditions {
public ActiveDirectoryCondition() {
super(ConfigurationPhase.PARSE_CONFIGURATION);
}
@ConditionalOnProperty(value = "auth.type", havingValue = "LDAP")
public static class OnAuthType {
}
@ConditionalOnProperty(value = "${oauth2.ldap.activeDirectory}:false", havingValue = "true", matchIfMissing = false)
public static class OnActiveDirectory {
}
}

View file

@ -43,9 +43,6 @@ import reactor.core.scheduler.Schedulers;
@Slf4j @Slf4j
public class MessagesController extends AbstractController implements MessagesApi { public class MessagesController extends AbstractController implements MessagesApi {
private static final int MAX_LOAD_RECORD_LIMIT = 100;
private static final int DEFAULT_LOAD_RECORD_LIMIT = 20;
private final MessagesService messagesService; private final MessagesService messagesService;
private final DeserializationService deserializationService; private final DeserializationService deserializationService;
private final AccessControlService accessControlService; private final AccessControlService accessControlService;
@ -91,8 +88,6 @@ public class MessagesController extends AbstractController implements MessagesAp
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING; seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD; seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS; filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;
int recordsLimit =
Optional.ofNullable(limit).map(s -> Math.min(s, MAX_LOAD_RECORD_LIMIT)).orElse(DEFAULT_LOAD_RECORD_LIMIT);
var positions = new ConsumerPosition( var positions = new ConsumerPosition(
seekType, seekType,
@ -103,7 +98,7 @@ public class MessagesController extends AbstractController implements MessagesAp
ResponseEntity.ok( ResponseEntity.ok(
messagesService.loadMessages( messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType, getCluster(clusterName), topicName, positions, q, filterQueryType,
recordsLimit, seekDirection, keySerde, valueSerde) limit, seekDirection, keySerde, valueSerde)
) )
); );

View file

@ -11,8 +11,6 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -82,15 +80,8 @@ public class ConsumerGroupMapper {
InternalConsumerGroup c, T consumerGroup) { InternalConsumerGroup c, T consumerGroup) {
consumerGroup.setGroupId(c.getGroupId()); consumerGroup.setGroupId(c.getGroupId());
consumerGroup.setMembers(c.getMembers().size()); consumerGroup.setMembers(c.getMembers().size());
int numTopics = Stream.concat(
c.getOffsets().keySet().stream().map(TopicPartition::topic),
c.getMembers().stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).collect(Collectors.toSet()).size();
consumerGroup.setMessagesBehind(c.getMessagesBehind()); consumerGroup.setMessagesBehind(c.getMessagesBehind());
consumerGroup.setTopics(numTopics); consumerGroup.setTopics(c.getTopicNum());
consumerGroup.setSimple(c.isSimple()); consumerGroup.setSimple(c.isSimple());
Optional.ofNullable(c.getState()) Optional.ofNullable(c.getState())

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.model; package com.provectus.kafka.ui.model;
import java.math.BigDecimal; import java.math.BigDecimal;
import javax.annotation.Nullable;
import lombok.Data; import lombok.Data;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
@ -10,15 +11,27 @@ public class InternalBroker {
private final Integer id; private final Integer id;
private final String host; private final String host;
private final Integer port; private final Integer port;
private final BigDecimal bytesInPerSec; private final @Nullable BigDecimal bytesInPerSec;
private final BigDecimal bytesOutPerSec; private final @Nullable BigDecimal bytesOutPerSec;
private final @Nullable Integer partitionsLeader;
private final @Nullable Integer partitions;
private final @Nullable Integer inSyncPartitions;
private final @Nullable BigDecimal leadersSkew;
private final @Nullable BigDecimal partitionsSkew;
public InternalBroker(Node node, Statistics statistics) { public InternalBroker(Node node,
PartitionDistributionStats partitionDistribution,
Statistics statistics) {
this.id = node.id(); this.id = node.id();
this.host = node.host(); this.host = node.host();
this.port = node.port(); this.port = node.port();
this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id()); this.bytesInPerSec = statistics.getMetrics().getBrokerBytesInPerSec().get(node.id());
this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id()); this.bytesOutPerSec = statistics.getMetrics().getBrokerBytesOutPerSec().get(node.id());
this.partitionsLeader = partitionDistribution.getPartitionLeaders().get(node);
this.partitions = partitionDistribution.getPartitionsCount().get(node);
this.inSyncPartitions = partitionDistribution.getInSyncPartitions().get(node);
this.leadersSkew = partitionDistribution.leadersSkew(node);
this.partitionsSkew = partitionDistribution.partitionsSkew(node);
} }
} }

View file

@ -5,6 +5,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@ -21,6 +22,7 @@ public class InternalConsumerGroup {
private final Map<TopicPartition, Long> offsets; private final Map<TopicPartition, Long> offsets;
private final Map<TopicPartition, Long> endOffsets; private final Map<TopicPartition, Long> endOffsets;
private final Long messagesBehind; private final Long messagesBehind;
private final Integer topicNum;
private final String partitionAssignor; private final String partitionAssignor;
private final ConsumerGroupState state; private final ConsumerGroupState state;
private final Node coordinator; private final Node coordinator;
@ -44,22 +46,12 @@ public class InternalConsumerGroup {
builder.simple(description.isSimpleConsumerGroup()); builder.simple(description.isSimpleConsumerGroup());
builder.state(description.state()); builder.state(description.state());
builder.partitionAssignor(description.partitionAssignor()); builder.partitionAssignor(description.partitionAssignor());
builder.members( Collection<InternalMember> internalMembers = initInternalMembers(description);
description.members().stream() builder.members(internalMembers);
.map(m ->
InternalConsumerGroup.InternalMember.builder()
.assignment(m.assignment().topicPartitions())
.clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList())
);
builder.offsets(groupOffsets); builder.offsets(groupOffsets);
builder.endOffsets(topicEndOffsets); builder.endOffsets(topicEndOffsets);
builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets)); builder.messagesBehind(calculateMessagesBehind(groupOffsets, topicEndOffsets));
builder.topicNum(calculateTopicNum(groupOffsets, internalMembers));
Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator); Optional.ofNullable(description.coordinator()).ifPresent(builder::coordinator);
return builder.build(); return builder.build();
} }
@ -80,4 +72,31 @@ public class InternalConsumerGroup {
return messagesBehind; return messagesBehind;
} }
private static Integer calculateTopicNum(Map<TopicPartition, Long> offsets, Collection<InternalMember> members) {
long topicNum = Stream.concat(
offsets.keySet().stream().map(TopicPartition::topic),
members.stream()
.flatMap(m -> m.getAssignment().stream().map(TopicPartition::topic))
).distinct().count();
return Integer.valueOf((int) topicNum);
}
private static Collection<InternalMember> initInternalMembers(ConsumerGroupDescription description) {
return description.members().stream()
.map(m ->
InternalConsumerGroup.InternalMember.builder()
.assignment(m.assignment().topicPartitions())
.clientId(m.clientId())
.groupInstanceId(m.groupInstanceId().orElse(""))
.consumerId(m.consumerId())
.clientId(m.clientId())
.host(m.host())
.build()
).collect(Collectors.toList());
}
} }

View file

@ -0,0 +1,93 @@
package com.provectus.kafka.ui.model;
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@Getter
@Slf4j
public class PartitionDistributionStats {
// avg skew will show unuseful results on low number of partitions
private static final int MIN_PARTITIONS_FOR_SKEW_CALCULATION = 50;
private static final MathContext ROUNDING_MATH_CTX = new MathContext(3);
private final Map<Node, Integer> partitionLeaders;
private final Map<Node, Integer> partitionsCount;
private final Map<Node, Integer> inSyncPartitions;
private final double avgLeadersCntPerBroker;
private final double avgPartitionsPerBroker;
private final boolean skewCanBeCalculated;
public static PartitionDistributionStats create(Statistics stats) {
return create(stats, MIN_PARTITIONS_FOR_SKEW_CALCULATION);
}
static PartitionDistributionStats create(Statistics stats, int minPartitionsForSkewCalculation) {
var partitionLeaders = new HashMap<Node, Integer>();
var partitionsReplicated = new HashMap<Node, Integer>();
var isr = new HashMap<Node, Integer>();
int partitionsCnt = 0;
for (TopicDescription td : stats.getTopicDescriptions().values()) {
for (TopicPartitionInfo tp : td.partitions()) {
partitionsCnt++;
tp.replicas().forEach(r -> incr(partitionsReplicated, r));
tp.isr().forEach(r -> incr(isr, r));
if (tp.leader() != null) {
incr(partitionLeaders, tp.leader());
}
}
}
int nodesWithPartitions = partitionsReplicated.size();
int partitionReplications = partitionsReplicated.values().stream().mapToInt(i -> i).sum();
var avgPartitionsPerBroker = nodesWithPartitions == 0 ? 0 : ((double) partitionReplications) / nodesWithPartitions;
int nodesWithLeaders = partitionLeaders.size();
int leadersCnt = partitionLeaders.values().stream().mapToInt(i -> i).sum();
var avgLeadersCntPerBroker = nodesWithLeaders == 0 ? 0 : ((double) leadersCnt) / nodesWithLeaders;
return new PartitionDistributionStats(
partitionLeaders,
partitionsReplicated,
isr,
avgLeadersCntPerBroker,
avgPartitionsPerBroker,
partitionsCnt >= minPartitionsForSkewCalculation
);
}
private static void incr(Map<Node, Integer> map, Node n) {
map.compute(n, (k, c) -> c == null ? 1 : ++c);
}
@Nullable
public BigDecimal partitionsSkew(Node node) {
return calculateAvgSkew(partitionsCount.get(node), avgPartitionsPerBroker);
}
@Nullable
public BigDecimal leadersSkew(Node node) {
return calculateAvgSkew(partitionLeaders.get(node), avgLeadersCntPerBroker);
}
// Returns difference (in percents) from average value, null if it can't be calculated
@Nullable
private BigDecimal calculateAvgSkew(@Nullable Integer value, double avgValue) {
if (avgValue == 0 || !skewCanBeCalculated) {
return null;
}
value = value == null ? 0 : value;
return new BigDecimal((value - avgValue) / avgValue * 100.0).round(ROUNDING_MATH_CTX);
}
}

View file

@ -123,11 +123,11 @@ public class ConsumerRecordDeserializer {
} }
private static Long getKeySize(ConsumerRecord<Bytes, Bytes> consumerRecord) { private static Long getKeySize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
return consumerRecord.key() != null ? (long) consumerRecord.key().get().length : null; return consumerRecord.key() != null ? (long) consumerRecord.serializedKeySize() : null;
} }
private static Long getValueSize(ConsumerRecord<Bytes, Bytes> consumerRecord) { private static Long getValueSize(ConsumerRecord<Bytes, Bytes> consumerRecord) {
return consumerRecord.value() != null ? (long) consumerRecord.value().get().length : null; return consumerRecord.value() != null ? (long) consumerRecord.serializedValueSize() : null;
} }
private static int headerSize(Header header) { private static int headerSize(Header header) {

View file

@ -122,8 +122,6 @@ public class SerdesInitializer {
registeredSerdes, registeredSerdes,
Optional.ofNullable(clusterProperties.getDefaultKeySerde()) Optional.ofNullable(clusterProperties.getDefaultKeySerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found")) .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default key serde not found"))
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
.orElse(null), .orElse(null),
Optional.ofNullable(clusterProperties.getDefaultValueSerde()) Optional.ofNullable(clusterProperties.getDefaultValueSerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found")) .map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))

View file

@ -1,33 +1,36 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.util.SslPropertiesUtil; import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.io.Closeable; import java.io.Closeable;
import java.time.Instant; import java.time.Instant;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@Service @Service
@RequiredArgsConstructor
@Slf4j @Slf4j
public class AdminClientServiceImpl implements AdminClientService, Closeable { public class AdminClientServiceImpl implements AdminClientService, Closeable {
private static final int DEFAULT_CLIENT_TIMEOUT_MS = 30_000;
private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong(); private static final AtomicLong CLIENT_ID_SEQ = new AtomicLong();
private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>(); private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
@Setter // used in tests private final int clientTimeout;
@Value("${kafka.admin-client-timeout:30000}")
private int clientTimeout; public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}
@Override @Override
public Mono<ReactiveAdminClient> get(KafkaCluster cluster) { public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
@ -42,7 +45,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties()); properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
properties.putIfAbsent( properties.putIfAbsent(
AdminClientConfig.CLIENT_ID_CONFIG, AdminClientConfig.CLIENT_ID_CONFIG,
"kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet() "kafka-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()

View file

@ -10,6 +10,7 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.InternalBroker; import com.provectus.kafka.ui.model.InternalBroker;
import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerConfig;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.PartitionDistributionStats;
import com.provectus.kafka.ui.service.metrics.RawMetric; import com.provectus.kafka.ui.service.metrics.RawMetric;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -64,11 +65,13 @@ public class BrokerService {
} }
public Flux<InternalBroker> getBrokers(KafkaCluster cluster) { public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
var stats = statisticsCache.get(cluster);
var partitionsDistribution = PartitionDistributionStats.create(stats);
return adminClientService return adminClientService
.get(cluster) .get(cluster)
.flatMap(ReactiveAdminClient::describeCluster) .flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream() .map(description -> description.getNodes().stream()
.map(node -> new InternalBroker(node, statisticsCache.get(cluster))) .map(node -> new InternalBroker(node, partitionsDistribution, stats))
.collect(Collectors.toList())) .collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable); .flatMapMany(Flux::fromIterable);
} }

View file

@ -101,6 +101,9 @@ public class ConsumerGroupService {
public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) { public record ConsumerGroupsPage(List<InternalConsumerGroup> consumerGroups, int totalPages) {
} }
private record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) {
}
public Mono<ConsumerGroupsPage> getConsumerGroupsPage( public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
KafkaCluster cluster, KafkaCluster cluster,
int pageNum, int pageNum,
@ -159,22 +162,19 @@ public class ConsumerGroupService {
sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList()); sortAndPaginate(descriptions.values(), comparator, pageNum, perPage, sortOrderDto).toList());
} }
case MESSAGES_BEHIND -> { case MESSAGES_BEHIND -> {
record GroupWithDescr(InternalConsumerGroup icg, ConsumerGroupDescription cgd) { }
Comparator<GroupWithDescr> comparator = Comparator.comparingLong(gwd -> Comparator<GroupWithDescr> comparator = Comparator.comparingLong(gwd ->
gwd.icg.getMessagesBehind() == null ? 0L : gwd.icg.getMessagesBehind()); gwd.icg.getMessagesBehind() == null ? 0L : gwd.icg.getMessagesBehind());
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList(); yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
yield ac.describeConsumerGroups(groupNames)
.flatMap(descriptionsMap -> {
List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
return getConsumerGroups(ac, descriptions)
.map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
.map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
.map(GroupWithDescr::cgd).toList());
} }
);
case TOPIC_NUM -> {
Comparator<GroupWithDescr> comparator = Comparator.comparingInt(gwd -> gwd.icg.getTopicNum());
yield loadDescriptionsByInternalConsumerGroups(ac, groups, comparator, pageNum, perPage, sortOrderDto);
} }
}; };
} }
@ -209,6 +209,27 @@ public class ConsumerGroupService {
.map(cgs -> new ArrayList<>(cgs.values())); .map(cgs -> new ArrayList<>(cgs.values()));
} }
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
List<ConsumerGroupListing> groups,
Comparator<GroupWithDescr> comparator,
int pageNum,
int perPage,
SortOrderDTO sortOrderDto) {
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
return ac.describeConsumerGroups(groupNames)
.flatMap(descriptionsMap -> {
List<ConsumerGroupDescription> descriptions = descriptionsMap.values().stream().toList();
return getConsumerGroups(ac, descriptions)
.map(icg -> Streams.zip(icg.stream(), descriptions.stream(), GroupWithDescr::new).toList())
.map(gwd -> sortAndPaginate(gwd, comparator, pageNum, perPage, sortOrderDto)
.map(GroupWithDescr::cgd).toList());
}
);
}
public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster, public Mono<InternalConsumerGroup> getConsumerGroupDetail(KafkaCluster cluster,
String consumerGroupId) { String consumerGroupId) {
return adminClientService.get(cluster) return adminClientService.get(cluster)

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.client.RetryingKafkaConnectClient; import com.provectus.kafka.ui.client.RetryingKafkaConnectClient;
import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi; import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.emitter.PollingSettings; import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO; import com.provectus.kafka.ui.model.ApplicationPropertyValidationDTO;
@ -22,9 +23,7 @@ import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.unit.DataSize; import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
@ -34,12 +33,18 @@ import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
@Service @Service
@RequiredArgsConstructor
@Slf4j @Slf4j
public class KafkaClusterFactory { public class KafkaClusterFactory {
@Value("${webclient.max-in-memory-buffer-size:20MB}") private static final DataSize DEFAULT_WEBCLIENT_BUFFER = DataSize.parse("20MB");
private DataSize maxBuffSize;
private final DataSize webClientMaxBuffSize;
public KafkaClusterFactory(WebclientProperties webclientProperties) {
this.webClientMaxBuffSize = Optional.ofNullable(webclientProperties.getMaxInMemoryBufferSize())
.map(DataSize::parse)
.orElse(DEFAULT_WEBCLIENT_BUFFER);
}
public KafkaCluster create(ClustersProperties properties, public KafkaCluster create(ClustersProperties properties,
ClustersProperties.Cluster clusterProperties) { ClustersProperties.Cluster clusterProperties) {
@ -140,7 +145,7 @@ public class KafkaClusterFactory {
url -> new RetryingKafkaConnectClient( url -> new RetryingKafkaConnectClient(
connectCluster.toBuilder().address(url).build(), connectCluster.toBuilder().address(url).build(),
cluster.getSsl(), cluster.getSsl(),
maxBuffSize webClientMaxBuffSize
), ),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No alive connect instances available", "No alive connect instances available",
@ -158,7 +163,7 @@ public class KafkaClusterFactory {
WebClient webClient = new WebClientConfigurator() WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword()) .configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureBufferSize(maxBuffSize) .configureBufferSize(webClientMaxBuffSize)
.build(); .build();
return ReactiveFailover.create( return ReactiveFailover.create(
parseUrlList(clusterProperties.getSchemaRegistry()), parseUrlList(clusterProperties.getSchemaRegistry()),
@ -181,7 +186,7 @@ public class KafkaClusterFactory {
clusterProperties.getKsqldbServerAuth(), clusterProperties.getKsqldbServerAuth(),
clusterProperties.getSsl(), clusterProperties.getSsl(),
clusterProperties.getKsqldbServerSsl(), clusterProperties.getKsqldbServerSsl(),
maxBuffSize webClientMaxBuffSize
), ),
ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER, ReactiveFailover.CONNECTION_REFUSED_EXCEPTION_FILTER,
"No live ksqldb instances available", "No live ksqldb instances available",

View file

@ -109,6 +109,7 @@ public class KafkaConnectService {
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) { private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {
return Stream.of( return Stream.of(
fullConnectorInfo.getName(), fullConnectorInfo.getName(),
fullConnectorInfo.getConnect(),
fullConnectorInfo.getStatus().getState().getValue(), fullConnectorInfo.getStatus().getState().getValue(),
fullConnectorInfo.getType().getValue()); fullConnectorInfo.getType().getValue());
} }

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service; package com.provectus.kafka.ui.service;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter; import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter; import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessageFilters; import com.provectus.kafka.ui.emitter.MessageFilters;
@ -20,13 +21,13 @@ import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
import com.provectus.kafka.ui.util.SslPropertiesUtil; import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.OffsetSpec;
@ -44,16 +45,35 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@Service @Service
@RequiredArgsConstructor
@Slf4j @Slf4j
public class MessagesService { public class MessagesService {
private static final int DEFAULT_MAX_PAGE_SIZE = 500;
private static final int DEFAULT_PAGE_SIZE = 100;
// limiting UI messages rate to 20/sec in tailing mode // limiting UI messages rate to 20/sec in tailing mode
public static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20; private static final int TAILING_UI_MESSAGE_THROTTLE_RATE = 20;
private final AdminClientService adminClientService; private final AdminClientService adminClientService;
private final DeserializationService deserializationService; private final DeserializationService deserializationService;
private final ConsumerGroupService consumerGroupService; private final ConsumerGroupService consumerGroupService;
private final int maxPageSize;
private final int defaultPageSize;
public MessagesService(AdminClientService adminClientService,
DeserializationService deserializationService,
ConsumerGroupService consumerGroupService,
ClustersProperties properties) {
this.adminClientService = adminClientService;
this.deserializationService = deserializationService;
this.consumerGroupService = consumerGroupService;
var pollingProps = Optional.ofNullable(properties.getPolling())
.orElseGet(ClustersProperties.PollingProperties::new);
this.maxPageSize = Optional.ofNullable(pollingProps.getMaxPageSize())
.orElse(DEFAULT_MAX_PAGE_SIZE);
this.defaultPageSize = Optional.ofNullable(pollingProps.getDefaultPageSize())
.orElse(DEFAULT_PAGE_SIZE);
}
private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) { private Mono<TopicDescription> withExistingTopic(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster) return adminClientService.get(cluster)
@ -139,7 +159,7 @@ public class MessagesService {
ConsumerPosition consumerPosition, ConsumerPosition consumerPosition,
@Nullable String query, @Nullable String query,
MessageFilterTypeDTO filterQueryType, MessageFilterTypeDTO filterQueryType,
int limit, @Nullable Integer pageSize,
SeekDirectionDTO seekDirection, SeekDirectionDTO seekDirection,
@Nullable String keySerde, @Nullable String keySerde,
@Nullable String valueSerde) { @Nullable String valueSerde) {
@ -147,7 +167,13 @@ public class MessagesService {
.flux() .flux()
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query, .flatMap(td -> loadMessagesImpl(cluster, topic, consumerPosition, query,
filterQueryType, limit, seekDirection, keySerde, valueSerde)); filterQueryType, fixPageSize(pageSize), seekDirection, keySerde, valueSerde));
}
private int fixPageSize(@Nullable Integer pageSize) {
return Optional.ofNullable(pageSize)
.filter(ps -> ps > 0 && ps <= maxPageSize)
.orElse(defaultPageSize);
} }
private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster, private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,

View file

@ -43,8 +43,7 @@ class TopicAnalysisStats {
Long max; Long max;
final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build(); final UpdateDoublesSketch sizeSketch = DoublesSketch.builder().build();
void apply(byte[] bytes) { void apply(int len) {
int len = bytes.length;
sum += len; sum += len;
min = minNullable(min, len); min = minNullable(min, len);
max = maxNullable(max, len); max = maxNullable(max, len);
@ -98,7 +97,7 @@ class TopicAnalysisStats {
if (rec.key() != null) { if (rec.key() != null) {
byte[] keyBytes = rec.key().get(); byte[] keyBytes = rec.key().get();
keysSize.apply(keyBytes); keysSize.apply(rec.serializedKeySize());
uniqKeys.update(keyBytes); uniqKeys.update(keyBytes);
} else { } else {
nullKeys++; nullKeys++;
@ -106,7 +105,7 @@ class TopicAnalysisStats {
if (rec.value() != null) { if (rec.value() != null) {
byte[] valueBytes = rec.value().get(); byte[] valueBytes = rec.value().get();
valuesSize.apply(valueBytes); valuesSize.apply(rec.serializedValueSize());
uniqValues.update(valueBytes); uniqValues.update(valueBytes);
} else { } else {
nullValues++; nullValues++;

View file

@ -44,7 +44,7 @@ public class DataMasking {
public static DataMasking create(@Nullable List<ClustersProperties.Masking> config) { public static DataMasking create(@Nullable List<ClustersProperties.Masking> config) {
return new DataMasking( return new DataMasking(
Optional.ofNullable(config).orElse(List.of()).stream().map(property -> { Optional.ofNullable(config).orElse(List.of()).stream().map(property -> {
Preconditions.checkNotNull(property.getType(), "masking type not specifed"); Preconditions.checkNotNull(property.getType(), "masking type not specified");
Preconditions.checkArgument( Preconditions.checkArgument(
StringUtils.isNotEmpty(property.getTopicKeysPattern()) StringUtils.isNotEmpty(property.getTopicKeysPattern())
|| StringUtils.isNotEmpty(property.getTopicValuesPattern()), || StringUtils.isNotEmpty(property.getTopicValuesPattern()),

View file

@ -0,0 +1,28 @@
package com.provectus.kafka.ui.service.masking.policies;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.regex.Pattern;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
interface FieldsSelector {
static FieldsSelector create(ClustersProperties.Masking property) {
if (StringUtils.hasText(property.getFieldsNamePattern()) && !CollectionUtils.isEmpty(property.getFields())) {
throw new ValidationException("You can't provide both fieldNames & fieldsNamePattern for masking");
}
if (StringUtils.hasText(property.getFieldsNamePattern())) {
Pattern pattern = Pattern.compile(property.getFieldsNamePattern());
return f -> pattern.matcher(f).matches();
}
if (!CollectionUtils.isEmpty(property.getFields())) {
return f -> property.getFields().contains(f);
}
//no pattern, no field names - mean all fields should be masked
return fieldName -> true;
}
boolean shouldBeMasked(String fieldName);
}

View file

@ -15,8 +15,8 @@ class Mask extends MaskingPolicy {
private final UnaryOperator<String> masker; private final UnaryOperator<String> masker;
Mask(List<String> fieldNames, List<String> maskingChars) { Mask(FieldsSelector fieldsSelector, List<String> maskingChars) {
super(fieldNames); super(fieldsSelector);
this.masker = createMasker(maskingChars); this.masker = createMasker(maskingChars);
} }
@ -38,22 +38,13 @@ class Mask extends MaskingPolicy {
for (int i = 0; i < input.length(); i++) { for (int i = 0; i < input.length(); i++) {
int cp = input.codePointAt(i); int cp = input.codePointAt(i);
switch (Character.getType(cp)) { switch (Character.getType(cp)) {
case Character.SPACE_SEPARATOR: case Character.SPACE_SEPARATOR,
case Character.LINE_SEPARATOR: Character.LINE_SEPARATOR,
case Character.PARAGRAPH_SEPARATOR: Character.PARAGRAPH_SEPARATOR -> sb.appendCodePoint(cp); // keeping separators as-is
sb.appendCodePoint(cp); // keeping separators as-is case Character.UPPERCASE_LETTER -> sb.append(maskingChars.get(0));
break; case Character.LOWERCASE_LETTER -> sb.append(maskingChars.get(1));
case Character.UPPERCASE_LETTER: case Character.DECIMAL_DIGIT_NUMBER -> sb.append(maskingChars.get(2));
sb.append(maskingChars.get(0)); default -> sb.append(maskingChars.get(3));
break;
case Character.LOWERCASE_LETTER:
sb.append(maskingChars.get(1));
break;
case Character.DECIMAL_DIGIT_NUMBER:
sb.append(maskingChars.get(2));
break;
default:
sb.append(maskingChars.get(3));
} }
} }
return sb.toString(); return sb.toString();

View file

@ -2,46 +2,36 @@ package com.provectus.kafka.ui.service.masking.policies;
import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ContainerNode;
import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.config.ClustersProperties;
import java.util.List;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor @RequiredArgsConstructor
public abstract class MaskingPolicy { public abstract class MaskingPolicy {
public static MaskingPolicy create(ClustersProperties.Masking property) { public static MaskingPolicy create(ClustersProperties.Masking property) {
List<String> fields = property.getFields() == null FieldsSelector fieldsSelector = FieldsSelector.create(property);
? List.of() // empty list means that policy will be applied to all fields return switch (property.getType()) {
: property.getFields(); case REMOVE -> new Remove(fieldsSelector);
switch (property.getType()) { case REPLACE -> new Replace(
case REMOVE: fieldsSelector,
return new Remove(fields);
case REPLACE:
return new Replace(
fields,
property.getReplacement() == null property.getReplacement() == null
? Replace.DEFAULT_REPLACEMENT ? Replace.DEFAULT_REPLACEMENT
: property.getReplacement() : property.getReplacement()
); );
case MASK: case MASK -> new Mask(
return new Mask( fieldsSelector,
fields, property.getMaskingCharsReplacement() == null
property.getPattern() == null
? Mask.DEFAULT_PATTERN ? Mask.DEFAULT_PATTERN
: property.getPattern() : property.getMaskingCharsReplacement()
); );
default: };
throw new IllegalStateException("Unknown policy type: " + property.getType());
}
} }
//---------------------------------------------------------------- //----------------------------------------------------------------
// empty list means policy will be applied to all fields private final FieldsSelector fieldsSelector;
private final List<String> fieldNames;
protected boolean fieldShouldBeMasked(String fieldName) { protected boolean fieldShouldBeMasked(String fieldName) {
return fieldNames.isEmpty() || fieldNames.contains(fieldName); return fieldsSelector.shouldBeMasked(fieldName);
} }
public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node); public abstract ContainerNode<?> applyToJsonContainer(ContainerNode<?> node);

View file

@ -4,12 +4,12 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ContainerNode; import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.List;
class Remove extends MaskingPolicy { class Remove extends MaskingPolicy {
Remove(List<String> fieldNames) { Remove(FieldsSelector fieldsSelector) {
super(fieldNames); super(fieldsSelector);
} }
@Override @Override

View file

@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import java.util.List;
class Replace extends MaskingPolicy { class Replace extends MaskingPolicy {
@ -14,8 +13,8 @@ class Replace extends MaskingPolicy {
private final String replacement; private final String replacement;
Replace(List<String> fieldNames, String replacementString) { Replace(FieldsSelector fieldsSelector, String replacementString) {
super(fieldNames); super(fieldsSelector);
this.replacement = Preconditions.checkNotNull(replacementString); this.replacement = Preconditions.checkNotNull(replacementString);
} }

View file

@ -12,6 +12,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Permission; import com.provectus.kafka.ui.model.rbac.Permission;
import com.provectus.kafka.ui.model.rbac.Resource; import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.Role; import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.Subject;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction; import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
@ -19,11 +20,11 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.rbac.extractor.CognitoAuthorityExtractor; import com.provectus.kafka.ui.service.rbac.extractor.CognitoAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor; import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor; import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.LdapAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.ProviderAuthorityExtractor; import com.provectus.kafka.ui.service.rbac.extractor.ProviderAuthorityExtractor;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -34,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.core.env.Environment;
import org.springframework.security.access.AccessDeniedException; import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.ReactiveSecurityContextHolder; import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContext;
@ -50,10 +52,11 @@ public class AccessControlService {
@Nullable @Nullable
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository; private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
private final RoleBasedAccessControlProperties properties;
private final Environment environment;
private boolean rbacEnabled = false; private boolean rbacEnabled = false;
private Set<ProviderAuthorityExtractor> extractors = Collections.emptySet(); private Set<ProviderAuthorityExtractor> oauthExtractors = Collections.emptySet();
private final RoleBasedAccessControlProperties properties;
@PostConstruct @PostConstruct
public void init() { public void init() {
@ -63,21 +66,26 @@ public class AccessControlService {
} }
rbacEnabled = true; rbacEnabled = true;
this.extractors = properties.getRoles() this.oauthExtractors = properties.getRoles()
.stream() .stream()
.map(role -> role.getSubjects() .map(role -> role.getSubjects()
.stream() .stream()
.map(provider -> switch (provider.getProvider()) { .map(Subject::getProvider)
.distinct()
.map(provider -> switch (provider) {
case OAUTH_COGNITO -> new CognitoAuthorityExtractor(); case OAUTH_COGNITO -> new CognitoAuthorityExtractor();
case OAUTH_GOOGLE -> new GoogleAuthorityExtractor(); case OAUTH_GOOGLE -> new GoogleAuthorityExtractor();
case OAUTH_GITHUB -> new GithubAuthorityExtractor(); case OAUTH_GITHUB -> new GithubAuthorityExtractor();
case LDAP, LDAP_AD -> new LdapAuthorityExtractor(); default -> null;
}).collect(Collectors.toSet())) })
.filter(Objects::nonNull)
.collect(Collectors.toSet()))
.flatMap(Set::stream) .flatMap(Set::stream)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
if ((clientRegistrationRepository == null || !clientRegistrationRepository.iterator().hasNext()) if (!properties.getRoles().isEmpty()
&& !properties.getRoles().isEmpty()) { && "oauth2".equalsIgnoreCase(environment.getProperty("auth.type"))
&& (clientRegistrationRepository == null || !clientRegistrationRepository.iterator().hasNext())) {
log.error("Roles are configured but no authentication methods are present. Authentication might fail."); log.error("Roles are configured but no authentication methods are present. Authentication might fail.");
} }
} }
@ -354,8 +362,8 @@ public class AccessControlService {
return isAccessible(Resource.KSQL, null, user, context, requiredActions); return isAccessible(Resource.KSQL, null, user, context, requiredActions);
} }
public Set<ProviderAuthorityExtractor> getExtractors() { public Set<ProviderAuthorityExtractor> getOauthExtractors() {
return extractors; return oauthExtractors;
} }
public List<Role> getRoles() { public List<Role> getRoles() {

View file

@ -1,23 +0,0 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class LdapAuthorityExtractor implements ProviderAuthorityExtractor {
@Override
public boolean isApplicable(String provider) {
return false; // TODO #2752
}
@Override
public Mono<Set<String>> extract(AccessControlService acs, Object value, Map<String, Object> additionalParams) {
return Mono.just(Collections.emptySet()); // TODO #2752
}
}

View file

@ -0,0 +1,70 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.provectus.kafka.ui.config.auth.LdapProperties;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.ldap.core.DirContextOperations;
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
import org.springframework.util.Assert;
@Slf4j
public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
private final AccessControlService acs;
private final LdapProperties props;
private final Function<Map<String, List<String>>, GrantedAuthority> authorityMapper = (record) -> {
String role = record.get(getGroupRoleAttribute()).get(0);
return new SimpleGrantedAuthority(role);
};
public RbacLdapAuthoritiesExtractor(ApplicationContext context) {
super(context.getBean(BaseLdapPathContextSource.class), null);
this.acs = context.getBean(AccessControlService.class);
this.props = context.getBean(LdapProperties.class);
}
@Override
public Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
return acs.getRoles()
.stream()
.map(Role::getSubjects)
.flatMap(List::stream)
.filter(s -> s.getProvider().equals(Provider.LDAP))
.filter(s -> s.getType().equals("group"))
.flatMap(subject -> getRoles(subject.getValue(), user.getNameInNamespace(), username).stream())
.collect(Collectors.toSet());
}
private Set<GrantedAuthority> getRoles(String groupSearchBase, String userDn, String username) {
Assert.notNull(groupSearchBase, "groupSearchBase is empty");
log.trace(
"Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
username, userDn, props.getGroupRoleAttribute(), getGroupSearchFilter(), groupSearchBase);
var ldapTemplate = getLdapTemplate();
ldapTemplate.setIgnoreNameNotFoundException(true);
Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
new String[] {props.getGroupRoleAttribute()});
return userRoles.stream()
.map(authorityMapper)
.peek(a -> log.debug("Mapped role [{}] for user [{}]", a, username))
.collect(Collectors.toSet());
}
}

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui.util;
import com.provectus.kafka.ui.config.ClustersProperties; import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.config.WebclientProperties;
import com.provectus.kafka.ui.config.auth.OAuthProperties; import com.provectus.kafka.ui.config.auth.OAuthProperties;
import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties; import com.provectus.kafka.ui.config.auth.RoleBasedAccessControlProperties;
import com.provectus.kafka.ui.exception.FileUploadException; import com.provectus.kafka.ui.exception.FileUploadException;
@ -97,6 +98,7 @@ public class DynamicConfigOperations {
.type(ctx.getEnvironment().getProperty("auth.type")) .type(ctx.getEnvironment().getProperty("auth.type"))
.oauth2(getNullableBean(OAuthProperties.class)) .oauth2(getNullableBean(OAuthProperties.class))
.build()) .build())
.webclient(getNullableBean(WebclientProperties.class))
.build(); .build();
} }
@ -204,6 +206,7 @@ public class DynamicConfigOperations {
private ClustersProperties kafka; private ClustersProperties kafka;
private RoleBasedAccessControlProperties rbac; private RoleBasedAccessControlProperties rbac;
private Auth auth; private Auth auth;
private WebclientProperties webclient;
@Data @Data
@Builder @Builder
@ -222,6 +225,9 @@ public class DynamicConfigOperations {
Optional.ofNullable(auth) Optional.ofNullable(auth)
.flatMap(a -> Optional.ofNullable(a.oauth2)) .flatMap(a -> Optional.ofNullable(a.oauth2))
.ifPresent(OAuthProperties::validate); .ifPresent(OAuthProperties::validate);
Optional.ofNullable(webclient)
.ifPresent(WebclientProperties::validate);
} }
} }

View file

@ -4,9 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
// Specifies field that can contain any kind of value - primitive, complex and nulls // Specifies field that can contain any kind of value - primitive, complex and nulls
public class AnyFieldSchema implements FieldSchema { class AnyFieldSchema implements FieldSchema {
public static AnyFieldSchema get() { static AnyFieldSchema get() {
return new AnyFieldSchema(); return new AnyFieldSchema();
} }

View file

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
public class ArrayFieldSchema implements FieldSchema { class ArrayFieldSchema implements FieldSchema {
private final FieldSchema itemsSchema; private final FieldSchema itemsSchema;
public ArrayFieldSchema(FieldSchema itemsSchema) { ArrayFieldSchema(FieldSchema itemsSchema) {
this.itemsSchema = itemsSchema; this.itemsSchema = itemsSchema;
} }

View file

@ -7,10 +7,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public class EnumJsonType extends JsonType { class EnumJsonType extends JsonType {
private final List<String> values; private final List<String> values;
public EnumJsonType(List<String> values) { EnumJsonType(List<String> values) {
super(Type.ENUM); super(Type.ENUM);
this.values = values; this.values = values;
} }

View file

@ -3,6 +3,6 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
public interface FieldSchema { interface FieldSchema {
JsonNode toJsonNode(ObjectMapper mapper); JsonNode toJsonNode(ObjectMapper mapper);
} }

View file

@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map; import java.util.Map;
public abstract class JsonType { abstract class JsonType {
protected final Type type; protected final Type type;
@ -12,13 +12,13 @@ public abstract class JsonType {
this.type = type; this.type = type;
} }
public Type getType() { Type getType() {
return type; return type;
} }
public abstract Map<String, JsonNode> toJsonNode(ObjectMapper mapper); abstract Map<String, JsonNode> toJsonNode(ObjectMapper mapper);
public enum Type { enum Type {
NULL, NULL,
BOOLEAN, BOOLEAN,
OBJECT, OBJECT,

View file

@ -2,21 +2,27 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
import javax.annotation.Nullable;
public class MapFieldSchema implements FieldSchema { class MapFieldSchema implements FieldSchema {
private final FieldSchema itemSchema; private final @Nullable FieldSchema itemSchema;
public MapFieldSchema(FieldSchema itemSchema) { MapFieldSchema(@Nullable FieldSchema itemSchema) {
this.itemSchema = itemSchema; this.itemSchema = itemSchema;
} }
MapFieldSchema() {
this(null);
}
@Override @Override
public JsonNode toJsonNode(ObjectMapper mapper) { public JsonNode toJsonNode(ObjectMapper mapper) {
final ObjectNode objectNode = mapper.createObjectNode(); final ObjectNode objectNode = mapper.createObjectNode();
objectNode.set("type", new TextNode(JsonType.Type.OBJECT.getName())); objectNode.set("type", new TextNode(JsonType.Type.OBJECT.getName()));
objectNode.set("additionalProperties", itemSchema.toJsonNode(mapper)); objectNode.set("additionalProperties", itemSchema != null ? itemSchema.toJsonNode(mapper) : BooleanNode.TRUE);
return objectNode; return objectNode;
} }
} }

View file

@ -9,24 +9,24 @@ import java.util.stream.Collectors;
import reactor.util.function.Tuple2; import reactor.util.function.Tuple2;
import reactor.util.function.Tuples; import reactor.util.function.Tuples;
public class ObjectFieldSchema implements FieldSchema { class ObjectFieldSchema implements FieldSchema {
public static final ObjectFieldSchema EMPTY = new ObjectFieldSchema(Map.of(), List.of()); static final ObjectFieldSchema EMPTY = new ObjectFieldSchema(Map.of(), List.of());
private final Map<String, FieldSchema> properties; private final Map<String, FieldSchema> properties;
private final List<String> required; private final List<String> required;
public ObjectFieldSchema(Map<String, FieldSchema> properties, ObjectFieldSchema(Map<String, FieldSchema> properties,
List<String> required) { List<String> required) {
this.properties = properties; this.properties = properties;
this.required = required; this.required = required;
} }
public Map<String, FieldSchema> getProperties() { Map<String, FieldSchema> getProperties() {
return properties; return properties;
} }
public List<String> getRequired() { List<String> getRequired() {
return required; return required;
} }

View file

@ -5,11 +5,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class OneOfFieldSchema implements FieldSchema { class OneOfFieldSchema implements FieldSchema {
private final List<FieldSchema> schemaList; private final List<FieldSchema> schemaList;
public OneOfFieldSchema( OneOfFieldSchema(List<FieldSchema> schemaList) {
List<FieldSchema> schemaList) {
this.schemaList = schemaList; this.schemaList = schemaList;
} }

View file

@ -94,6 +94,9 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
if (wellKnownTypeSchema.isPresent()) { if (wellKnownTypeSchema.isPresent()) {
return wellKnownTypeSchema.get(); return wellKnownTypeSchema.get();
} }
if (field.isMapField()) {
return new MapFieldSchema();
}
final JsonType jsonType = convertType(field); final JsonType jsonType = convertType(field);
FieldSchema fieldSchema; FieldSchema fieldSchema;
if (jsonType.getType().equals(JsonType.Type.OBJECT)) { if (jsonType.getType().equals(JsonType.Type.OBJECT)) {
@ -149,20 +152,15 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
} }
private JsonType convertType(Descriptors.FieldDescriptor field) { private JsonType convertType(Descriptors.FieldDescriptor field) {
switch (field.getType()) { return switch (field.getType()) {
case INT32: case INT32, FIXED32, SFIXED32, SINT32 -> new SimpleJsonType(
case FIXED32:
case SFIXED32:
case SINT32:
return new SimpleJsonType(
JsonType.Type.INTEGER, JsonType.Type.INTEGER,
Map.of( Map.of(
"maximum", IntNode.valueOf(Integer.MAX_VALUE), "maximum", IntNode.valueOf(Integer.MAX_VALUE),
"minimum", IntNode.valueOf(Integer.MIN_VALUE) "minimum", IntNode.valueOf(Integer.MIN_VALUE)
) )
); );
case UINT32: case UINT32 -> new SimpleJsonType(
return new SimpleJsonType(
JsonType.Type.INTEGER, JsonType.Type.INTEGER,
Map.of( Map.of(
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()), "maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
@ -172,44 +170,29 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
//TODO: actually all *64 types will be printed with quotes (as strings), //TODO: actually all *64 types will be printed with quotes (as strings),
// see JsonFormat::printSingleFieldValue for impl. This can cause problems when you copy-paste from messages // see JsonFormat::printSingleFieldValue for impl. This can cause problems when you copy-paste from messages
// table to `Produce` area - need to think if it is critical or not. // table to `Produce` area - need to think if it is critical or not.
case INT64: case INT64, FIXED64, SFIXED64, SINT64 -> new SimpleJsonType(
case FIXED64:
case SFIXED64:
case SINT64:
return new SimpleJsonType(
JsonType.Type.INTEGER, JsonType.Type.INTEGER,
Map.of( Map.of(
"maximum", LongNode.valueOf(Long.MAX_VALUE), "maximum", LongNode.valueOf(Long.MAX_VALUE),
"minimum", LongNode.valueOf(Long.MIN_VALUE) "minimum", LongNode.valueOf(Long.MIN_VALUE)
) )
); );
case UINT64: case UINT64 -> new SimpleJsonType(
return new SimpleJsonType(
JsonType.Type.INTEGER, JsonType.Type.INTEGER,
Map.of( Map.of(
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()), "maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
"minimum", LongNode.valueOf(0) "minimum", LongNode.valueOf(0)
) )
); );
case MESSAGE: case MESSAGE, GROUP -> new SimpleJsonType(JsonType.Type.OBJECT);
case GROUP: case ENUM -> new EnumJsonType(
return new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM:
return new EnumJsonType(
field.getEnumType().getValues().stream() field.getEnumType().getValues().stream()
.map(Descriptors.EnumValueDescriptor::getName) .map(Descriptors.EnumValueDescriptor::getName)
.collect(Collectors.toList()) .collect(Collectors.toList())
); );
case BYTES: case BYTES, STRING -> new SimpleJsonType(JsonType.Type.STRING);
case STRING: case FLOAT, DOUBLE -> new SimpleJsonType(JsonType.Type.NUMBER);
return new SimpleJsonType(JsonType.Type.STRING); case BOOL -> new SimpleJsonType(JsonType.Type.BOOLEAN);
case FLOAT: };
case DOUBLE:
return new SimpleJsonType(JsonType.Type.NUMBER);
case BOOL:
return new SimpleJsonType(JsonType.Type.BOOLEAN);
default:
return new SimpleJsonType(JsonType.Type.STRING);
}
} }
} }

View file

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.node.TextNode;
public class RefFieldSchema implements FieldSchema { class RefFieldSchema implements FieldSchema {
private final String ref; private final String ref;
public RefFieldSchema(String ref) { RefFieldSchema(String ref) {
this.ref = ref; this.ref = ref;
} }
@ -16,7 +16,7 @@ public class RefFieldSchema implements FieldSchema {
return mapper.createObjectNode().set("$ref", new TextNode(ref)); return mapper.createObjectNode().set("$ref", new TextNode(ref));
} }
public String getRef() { String getRef() {
return ref; return ref;
} }
} }

View file

@ -3,10 +3,10 @@ package com.provectus.kafka.ui.util.jsonschema;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
public class SimpleFieldSchema implements FieldSchema { class SimpleFieldSchema implements FieldSchema {
private final JsonType type; private final JsonType type;
public SimpleFieldSchema(JsonType type) { SimpleFieldSchema(JsonType type) {
this.type = type; this.type = type;
} }

View file

@ -6,15 +6,15 @@ import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import java.util.Map; import java.util.Map;
public class SimpleJsonType extends JsonType { class SimpleJsonType extends JsonType {
private final Map<String, JsonNode> additionalTypeProperties; private final Map<String, JsonNode> additionalTypeProperties;
public SimpleJsonType(Type type) { SimpleJsonType(Type type) {
this(type, Map.of()); this(type, Map.of());
} }
public SimpleJsonType(Type type, Map<String, JsonNode> additionalTypeProperties) { SimpleJsonType(Type type, Map<String, JsonNode> additionalTypeProperties) {
super(type); super(type);
this.additionalTypeProperties = additionalTypeProperties; this.additionalTypeProperties = additionalTypeProperties;
} }

View file

@ -0,0 +1,83 @@
package com.provectus.kafka.ui.model;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.assertj.core.data.Percentage;
import org.junit.jupiter.api.Test;
class PartitionDistributionStatsTest {
@Test
void skewCalculatedBasedOnPartitionsCounts() {
Node n1 = new Node(1, "n1", 9092);
Node n2 = new Node(2, "n2", 9092);
Node n3 = new Node(3, "n3", 9092);
Node n4 = new Node(4, "n4", 9092);
var stats = PartitionDistributionStats.create(
Statistics.builder()
.clusterDescription(
new ReactiveAdminClient.ClusterDescription(null, "test", Set.of(n1, n2, n3), null))
.topicDescriptions(
Map.of(
"t1", new TopicDescription(
"t1", false,
List.of(
new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
new TopicPartitionInfo(1, n2, List.of(n2, n3), List.of(n2, n3))
)
),
"t2", new TopicDescription(
"t2", false,
List.of(
new TopicPartitionInfo(0, n1, List.of(n1, n2), List.of(n1, n2)),
new TopicPartitionInfo(1, null, List.of(n2, n1), List.of(n1))
)
)
)
)
.build(), 4
);
assertThat(stats.getPartitionLeaders())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 2, n2, 1));
assertThat(stats.getPartitionsCount())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 4, n3, 1));
assertThat(stats.getInSyncPartitions())
.containsExactlyInAnyOrderEntriesOf(Map.of(n1, 3, n2, 3, n3, 1));
// Node(partitions): n1(3), n2(4), n3(1), n4(0)
// average partitions cnt = (3+4+1) / 3 = 2.666 (counting only nodes with partitions!)
assertThat(stats.getAvgPartitionsPerBroker())
.isCloseTo(2.666, Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n1))
.isCloseTo(BigDecimal.valueOf(12.5), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n2))
.isCloseTo(BigDecimal.valueOf(50), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n3))
.isCloseTo(BigDecimal.valueOf(-62.5), Percentage.withPercentage(1));
assertThat(stats.partitionsSkew(n4))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
// Node(leaders): n1(2), n2(1), n3(0), n4(0)
// average leaders cnt = (2+1) / 2 = 1.5 (counting only nodes with leaders!)
assertThat(stats.leadersSkew(n1))
.isCloseTo(BigDecimal.valueOf(33.33), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n2))
.isCloseTo(BigDecimal.valueOf(-33.33), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n3))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
assertThat(stats.leadersSkew(n4))
.isCloseTo(BigDecimal.valueOf(-100), Percentage.withPercentage(1));
}
}

View file

@ -0,0 +1,53 @@
package com.provectus.kafka.ui.service.masking.policies;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.List;
import org.junit.jupiter.api.Test;
class FieldsSelectorTest {
@Test
void selectsFieldsDueToProvidedPattern() {
var properties = new ClustersProperties.Masking();
properties.setFieldsNamePattern("f1|f2");
var selector = FieldsSelector.create(properties);
assertThat(selector.shouldBeMasked("f1")).isTrue();
assertThat(selector.shouldBeMasked("f2")).isTrue();
assertThat(selector.shouldBeMasked("doesNotMatchPattern")).isFalse();
}
@Test
void selectsFieldsDueToProvidedFieldNames() {
var properties = new ClustersProperties.Masking();
properties.setFields(List.of("f1", "f2"));
var selector = FieldsSelector.create(properties);
assertThat(selector.shouldBeMasked("f1")).isTrue();
assertThat(selector.shouldBeMasked("f2")).isTrue();
assertThat(selector.shouldBeMasked("notInAList")).isFalse();
}
@Test
void selectAllFieldsIfNoPatternAndNoNamesProvided() {
var properties = new ClustersProperties.Masking();
var selector = FieldsSelector.create(properties);
assertThat(selector.shouldBeMasked("anyPropertyName")).isTrue();
}
@Test
void throwsExceptionIfBothFieldListAndPatternProvided() {
var properties = new ClustersProperties.Masking();
properties.setFieldsNamePattern("f1|f2");
properties.setFields(List.of("f3", "f4"));
assertThatThrownBy(() -> FieldsSelector.create(properties))
.isInstanceOf(ValidationException.class);
}
}

View file

@ -15,35 +15,35 @@ import org.junit.jupiter.params.provider.MethodSource;
class MaskTest { class MaskTest {
private static final List<String> TARGET_FIELDS = List.of("id", "name"); private static final FieldsSelector FIELDS_SELECTOR = fieldName -> List.of("id", "name").contains(fieldName);
private static final List<String> PATTERN = List.of("X", "x", "n", "-"); private static final List<String> PATTERN = List.of("X", "x", "n", "-");
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) { void testApplyToJsonContainer(FieldsSelector selector, ContainerNode<?> original, ContainerNode<?> expected) {
Mask policy = new Mask(fields, PATTERN); Mask policy = new Mask(selector, PATTERN);
assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected); assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
} }
private static Stream<Arguments> testApplyToJsonContainer() { private static Stream<Arguments> testApplyToJsonContainer() {
return Stream.of( return Stream.of(
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"), parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
parse("{ \"id\": \"nnn\", \"name\": { \"first\": \"Xxxxx\", \"surname\": \"Xxxxnnn-\"}}") parse("{ \"id\": \"nnn\", \"name\": { \"first\": \"Xxxxx\", \"surname\": \"Xxxxnnn-\"}}")
), ),
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"), parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
parse("[{ \"id\": \"nnn\", \"f2\": 234}, { \"name\": \"n-n\", \"f2\": 345} ]") parse("[{ \"id\": \"nnn\", \"f2\": 234}, { \"name\": \"n-n\", \"f2\": 345} ]")
), ),
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"), parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Xxxxnnn-\"}}") parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Xxxxnnn-\"}}")
), ),
Arguments.of( Arguments.of(
List.of(), (FieldsSelector) (fieldName -> true),
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"), parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"Xxxxx\", \"name\": \"Xxxxnnn-\"}}") parse("{ \"outer\": { \"f1\": \"Xxxxx\", \"name\": \"Xxxxnnn-\"}}")
) )
@ -57,7 +57,7 @@ class MaskTest {
"null, xxxx" "null, xxxx"
}) })
void testApplyToString(String original, String expected) { void testApplyToString(String original, String expected) {
Mask policy = new Mask(List.of(), PATTERN); Mask policy = new Mask(fieldName -> true, PATTERN);
assertThat(policy.applyToString(original)).isEqualTo(expected); assertThat(policy.applyToString(original)).isEqualTo(expected);
} }

View file

@ -15,39 +15,39 @@ import org.junit.jupiter.params.provider.MethodSource;
class RemoveTest { class RemoveTest {
private static final List<String> TARGET_FIELDS = List.of("id", "name"); private static final FieldsSelector FIELDS_SELECTOR = fieldName -> List.of("id", "name").contains(fieldName);
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) { void testApplyToJsonContainer(FieldsSelector fieldsSelector, ContainerNode<?> original, ContainerNode<?> expected) {
var policy = new Remove(fields); var policy = new Remove(fieldsSelector);
assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected); assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
} }
private static Stream<Arguments> testApplyToJsonContainer() { private static Stream<Arguments> testApplyToJsonContainer() {
return Stream.of( return Stream.of(
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"), parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
parse("{}") parse("{}")
), ),
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"), parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
parse("[{ \"f2\": 234}, { \"f2\": 345} ]") parse("[{ \"f2\": 234}, { \"f2\": 345} ]")
), ),
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"), parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"James\"}}") parse("{ \"outer\": { \"f1\": \"James\"}}")
), ),
Arguments.of( Arguments.of(
List.of(), (FieldsSelector) (fieldName -> true),
parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"), parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"),
parse("{}") parse("{}")
), ),
Arguments.of( Arguments.of(
List.of(), (FieldsSelector) (fieldName -> true),
parse("[{ \"f1\": 123}, { \"f2\": \"1.2\"} ]"), parse("[{ \"f1\": 123}, { \"f2\": \"1.2\"} ]"),
parse("[{}, {}]") parse("[{}, {}]")
) )
@ -66,7 +66,7 @@ class RemoveTest {
"null, null" "null, null"
}) })
void testApplyToString(String original, String expected) { void testApplyToString(String original, String expected) {
var policy = new Remove(List.of()); var policy = new Remove(fieldName -> true);
assertThat(policy.applyToString(original)).isEqualTo(expected); assertThat(policy.applyToString(original)).isEqualTo(expected);
} }
} }

View file

@ -15,35 +15,35 @@ import org.junit.jupiter.params.provider.MethodSource;
class ReplaceTest { class ReplaceTest {
private static final List<String> TARGET_FIELDS = List.of("id", "name"); private static final FieldsSelector FIELDS_SELECTOR = fieldName -> List.of("id", "name").contains(fieldName);
private static final String REPLACEMENT_STRING = "***"; private static final String REPLACEMENT_STRING = "***";
@ParameterizedTest @ParameterizedTest
@MethodSource @MethodSource
void testApplyToJsonContainer(List<String> fields, ContainerNode<?> original, ContainerNode<?> expected) { void testApplyToJsonContainer(FieldsSelector fieldsSelector, ContainerNode<?> original, ContainerNode<?> expected) {
var policy = new Replace(fields, REPLACEMENT_STRING); var policy = new Replace(fieldsSelector, REPLACEMENT_STRING);
assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected); assertThat(policy.applyToJsonContainer(original)).isEqualTo(expected);
} }
private static Stream<Arguments> testApplyToJsonContainer() { private static Stream<Arguments> testApplyToJsonContainer() {
return Stream.of( return Stream.of(
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"), parse("{ \"id\": 123, \"name\": { \"first\": \"James\", \"surname\": \"Bond777!\"}}"),
parse("{ \"id\": \"***\", \"name\": { \"first\": \"***\", \"surname\": \"***\"}}") parse("{ \"id\": \"***\", \"name\": { \"first\": \"***\", \"surname\": \"***\"}}")
), ),
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"), parse("[{ \"id\": 123, \"f2\": 234}, { \"name\": \"1.2\", \"f2\": 345} ]"),
parse("[{ \"id\": \"***\", \"f2\": 234}, { \"name\": \"***\", \"f2\": 345} ]") parse("[{ \"id\": \"***\", \"f2\": 234}, { \"name\": \"***\", \"f2\": 345} ]")
), ),
Arguments.of( Arguments.of(
TARGET_FIELDS, FIELDS_SELECTOR,
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"), parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"Bond777!\"}}"),
parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"***\"}}") parse("{ \"outer\": { \"f1\": \"James\", \"name\": \"***\"}}")
), ),
Arguments.of( Arguments.of(
List.of(), (FieldsSelector) (fieldName -> true),
parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"), parse("{ \"outer\": { \"f1\": \"v1\", \"f2\": \"v2\", \"inner\" : {\"if1\": \"iv1\"}}}"),
parse("{ \"outer\": { \"f1\": \"***\", \"f2\": \"***\", \"inner\" : {\"if1\": \"***\"}}}}") parse("{ \"outer\": { \"f1\": \"***\", \"f2\": \"***\", \"inner\" : {\"if1\": \"***\"}}}}")
) )
@ -62,7 +62,7 @@ class ReplaceTest {
"null, ***" "null, ***"
}) })
void testApplyToString(String original, String expected) { void testApplyToString(String original, String expected) {
var policy = new Replace(List.of(), REPLACEMENT_STRING); var policy = new Replace(fieldName -> true, REPLACEMENT_STRING);
assertThat(policy.applyToString(original)).isEqualTo(expected); assertThat(policy.applyToString(original)).isEqualTo(expected);
} }
} }

View file

@ -59,8 +59,10 @@ class ProtobufSchemaConverterTest {
TestMsg outer_ref = 2; TestMsg outer_ref = 2;
EmbeddedMsg self_ref = 3; EmbeddedMsg self_ref = 3;
} }
}""";
map<int32, string> intToStringMap = 21;
map<string, EmbeddedMsg> strToObjMap = 22;
}""";
String expectedJsonSchema = """ String expectedJsonSchema = """
{ {
@ -109,7 +111,9 @@ class ProtobufSchemaConverterTest {
"v2": { "type": [ "number", "string", "object", "array", "boolean", "null" ] }, "v2": { "type": [ "number", "string", "object", "array", "boolean", "null" ] },
"uint32_w_field": { "type": "integer", "maximum": 4294967295, "minimum": 0 }, "uint32_w_field": { "type": "integer", "maximum": 4294967295, "minimum": 0 },
"bool_w_field": { "type": "boolean" }, "bool_w_field": { "type": "boolean" },
"uint64_w_field": { "type": "integer", "maximum": 18446744073709551615, "minimum": 0 } "uint64_w_field": { "type": "integer", "maximum": 18446744073709551615, "minimum": 0 },
"strToObjMap": { "type": "object", "additionalProperties": true },
"intToStringMap": { "type": "object", "additionalProperties": true }
} }
}, },
"test.TestMsg.EmbeddedMsg": { "test.TestMsg.EmbeddedMsg": {

View file

@ -2375,6 +2375,16 @@ components:
type: number type: number
bytesOutPerSec: bytesOutPerSec:
type: number type: number
partitionsLeader:
type: integer
partitions:
type: integer
inSyncPartitions:
type: integer
partitionsSkew:
type: number
leadersSkew:
type: number
required: required:
- id - id
@ -2441,6 +2451,7 @@ components:
- MEMBERS - MEMBERS
- STATE - STATE
- MESSAGES_BEHIND - MESSAGES_BEHIND
- TOPIC_NUM
ConsumerGroupsPageResponse: ConsumerGroupsPageResponse:
type: object type: object
@ -3467,6 +3478,12 @@ components:
type: array type: array
items: items:
$ref: '#/components/schemas/Action' $ref: '#/components/schemas/Action'
webclient:
type: object
properties:
maxInMemoryBufferSize:
type: string
description: "examples: 20, 12KB, 5MB"
kafka: kafka:
type: object type: object
properties: properties:
@ -3479,6 +3496,14 @@ components:
type: integer type: integer
noDataEmptyPolls: noDataEmptyPolls:
type: integer type: integer
maxPageSize:
type: integer
defaultPageSize:
type: integer
adminClientTimeout:
type: integer
internalTopicPrefix:
type: string
clusters: clusters:
type: array type: array
items: items:
@ -3607,7 +3632,9 @@ components:
type: array type: array
items: items:
type: string type: string
pattern: fieldsNamePattern:
type: string
maskingCharsReplacement:
type: array type: array
items: items:
type: string type: string

View file

@ -36,29 +36,31 @@ import org.springframework.web.reactive.function.client.WebClientResponseExcepti
@Slf4j @Slf4j
public class ApiService extends BaseSource { public class ApiService extends BaseSource {
private final ApiClient apiClient = new ApiClient().setBasePath(BASE_API_URL);
@SneakyThrows @SneakyThrows
private TopicsApi topicApi() { private TopicsApi topicApi() {
return new TopicsApi(new ApiClient().setBasePath(BASE_API_URL)); return new TopicsApi(apiClient);
} }
@SneakyThrows @SneakyThrows
private SchemasApi schemaApi() { private SchemasApi schemaApi() {
return new SchemasApi(new ApiClient().setBasePath(BASE_API_URL)); return new SchemasApi(apiClient);
} }
@SneakyThrows @SneakyThrows
private KafkaConnectApi connectorApi() { private KafkaConnectApi connectorApi() {
return new KafkaConnectApi(new ApiClient().setBasePath(BASE_API_URL)); return new KafkaConnectApi(apiClient);
} }
@SneakyThrows @SneakyThrows
private MessagesApi messageApi() { private MessagesApi messageApi() {
return new MessagesApi(new ApiClient().setBasePath(BASE_API_URL)); return new MessagesApi(apiClient);
} }
@SneakyThrows @SneakyThrows
private KsqlApi ksqlApi() { private KsqlApi ksqlApi() {
return new KsqlApi(new ApiClient().setBasePath(BASE_API_URL)); return new KsqlApi(apiClient);
} }
@SneakyThrows @SneakyThrows

View file

@ -1,6 +1,8 @@
package com.provectus.kafka.ui.manualsuite.backlog; package com.provectus.kafka.ui.manualsuite.backlog;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID; import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.KSQL_DB_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.SCHEMAS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID; import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID;
import static com.provectus.kafka.ui.utilities.qase.enums.State.TO_BE_AUTOMATED; import static com.provectus.kafka.ui.utilities.qase.enums.State.TO_BE_AUTOMATED;
@ -20,30 +22,72 @@ public class SmokeBacklog extends BaseManualTest {
} }
@Automation(state = TO_BE_AUTOMATED) @Automation(state = TO_BE_AUTOMATED)
@Suite(id = BROKERS_SUITE_ID) @Suite(id = KSQL_DB_SUITE_ID)
@QaseId(331) @QaseId(277)
@Test @Test
public void testCaseB() { public void testCaseB() {
} }
@Automation(state = TO_BE_AUTOMATED) @Automation(state = TO_BE_AUTOMATED)
@Suite(id = BROKERS_SUITE_ID) @Suite(id = BROKERS_SUITE_ID)
@QaseId(332) @QaseId(331)
@Test @Test
public void testCaseC() { public void testCaseC() {
} }
@Automation(state = TO_BE_AUTOMATED) @Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID) @Suite(id = BROKERS_SUITE_ID)
@QaseId(335) @QaseId(332)
@Test @Test
public void testCaseD() { public void testCaseD() {
} }
@Automation(state = TO_BE_AUTOMATED) @Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID) @Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(336) @QaseId(335)
@Test @Test
public void testCaseE() { public void testCaseE() {
} }
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(336)
@Test
public void testCaseF() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(343)
@Test
public void testCaseG() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = KSQL_DB_SUITE_ID)
@QaseId(344)
@Test
public void testCaseH() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(345)
@Test
public void testCaseI() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(346)
@Test
public void testCaseJ() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = TOPICS_PROFILE_SUITE_ID)
@QaseId(347)
@Test
public void testCaseK() {
}
} }

View file

@ -92,4 +92,28 @@ public class TopicsTest extends BaseManualTest {
@Test @Test
public void testCaseN() { public void testCaseN() {
} }
@Automation(state = NOT_AUTOMATED)
@QaseId(337)
@Test
public void testCaseO() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(339)
@Test
public void testCaseP() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(341)
@Test
public void testCaseQ() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(342)
@Test
public void testCaseR() {
}
} }

View file

@ -14,4 +14,16 @@ public class WizardTest extends BaseManualTest {
@Test @Test
public void testCaseA() { public void testCaseA() {
} }
@Automation(state = NOT_AUTOMATED)
@QaseId(338)
@Test
public void testCaseB() {
}
@Automation(state = NOT_AUTOMATED)
@QaseId(340)
@Test
public void testCaseC() {
}
} }

View file

@ -486,11 +486,7 @@ public class TopicsTest extends BaseTest {
topicDetails topicDetails
.waitUntilScreenReady(); .waitUntilScreenReady();
TOPIC_LIST.add(topicToCopy); TOPIC_LIST.add(topicToCopy);
SoftAssert softly = new SoftAssert(); Assert.assertTrue(topicDetails.isTopicHeaderVisible(topicToCopy.getName()), "isTopicHeaderVisible()");
softly.assertTrue(topicDetails.isAlertWithMessageVisible(SUCCESS, "Topic successfully created."),
"isAlertWithMessageVisible()");
softly.assertTrue(topicDetails.isTopicHeaderVisible(topicToCopy.getName()), "isTopicHeaderVisible()");
softly.assertAll();
} }
@AfterClass(alwaysRun = true) @AfterClass(alwaysRun = true)

View file

@ -1,26 +1,41 @@
import React from 'react'; import React from 'react';
import { FullConnectorInfo } from 'generated-sources'; import {
Action,
ConnectorAction,
ConnectorState,
FullConnectorInfo,
ResourceType,
} from 'generated-sources';
import { CellContext } from '@tanstack/react-table'; import { CellContext } from '@tanstack/react-table';
import { ClusterNameRoute } from 'lib/paths'; import { ClusterNameRoute } from 'lib/paths';
import useAppParams from 'lib/hooks/useAppParams'; import useAppParams from 'lib/hooks/useAppParams';
import { Dropdown, DropdownItem } from 'components/common/Dropdown'; import { Dropdown, DropdownItem } from 'components/common/Dropdown';
import { useDeleteConnector } from 'lib/hooks/api/kafkaConnect'; import {
useDeleteConnector,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
import { useConfirm } from 'lib/hooks/useConfirm'; import { useConfirm } from 'lib/hooks/useConfirm';
import { useIsMutating } from '@tanstack/react-query';
import { ActionDropdownItem } from 'components/common/ActionComponent';
const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
row, row,
}) => { }) => {
const { connect, name } = row.original; const { connect, name, status } = row.original;
const { clusterName } = useAppParams<ClusterNameRoute>(); const { clusterName } = useAppParams<ClusterNameRoute>();
const mutationsNumber = useIsMutating();
const isMutating = mutationsNumber > 0;
const confirm = useConfirm(); const confirm = useConfirm();
const deleteMutation = useDeleteConnector({ const deleteMutation = useDeleteConnector({
clusterName, clusterName,
connectName: connect, connectName: connect,
connectorName: name, connectorName: name,
}); });
const stateMutation = useUpdateConnectorState({
clusterName,
connectName: connect,
connectorName: name,
});
const handleDelete = () => { const handleDelete = () => {
confirm( confirm(
<> <>
@ -31,8 +46,66 @@ const ActionsCell: React.FC<CellContext<FullConnectorInfo, unknown>> = ({
} }
); );
}; };
// const stateMutation = useUpdateConnectorState(routerProps);
const resumeConnectorHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESUME);
const restartConnectorHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART);
const restartAllTasksHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART_ALL_TASKS);
const restartFailedTasksHandler = () =>
stateMutation.mutateAsync(ConnectorAction.RESTART_FAILED_TASKS);
return ( return (
<Dropdown> <Dropdown>
{status.state === ConnectorState.PAUSED && (
<ActionDropdownItem
onClick={resumeConnectorHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Resume
</ActionDropdownItem>
)}
<ActionDropdownItem
onClick={restartConnectorHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Restart Connector
</ActionDropdownItem>
<ActionDropdownItem
onClick={restartAllTasksHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Restart All Tasks
</ActionDropdownItem>
<ActionDropdownItem
onClick={restartFailedTasksHandler}
disabled={isMutating}
permission={{
resource: ResourceType.CONNECT,
action: Action.EDIT,
value: name,
}}
>
Restart Failed Tasks
</ActionDropdownItem>
<DropdownItem onClick={handleDelete} danger> <DropdownItem onClick={handleDelete} danger>
Remove Connector Remove Connector
</DropdownItem> </DropdownItem>

View file

@ -9,7 +9,11 @@ import { screen, waitFor } from '@testing-library/react';
import userEvent from '@testing-library/user-event'; import userEvent from '@testing-library/user-event';
import { render, WithRoute } from 'lib/testHelpers'; import { render, WithRoute } from 'lib/testHelpers';
import { clusterConnectConnectorPath, clusterConnectorsPath } from 'lib/paths'; import { clusterConnectConnectorPath, clusterConnectorsPath } from 'lib/paths';
import { useConnectors, useDeleteConnector } from 'lib/hooks/api/kafkaConnect'; import {
useConnectors,
useDeleteConnector,
useUpdateConnectorState,
} from 'lib/hooks/api/kafkaConnect';
const mockedUsedNavigate = jest.fn(); const mockedUsedNavigate = jest.fn();
const mockDelete = jest.fn(); const mockDelete = jest.fn();
@ -22,6 +26,7 @@ jest.mock('react-router-dom', () => ({
jest.mock('lib/hooks/api/kafkaConnect', () => ({ jest.mock('lib/hooks/api/kafkaConnect', () => ({
useConnectors: jest.fn(), useConnectors: jest.fn(),
useDeleteConnector: jest.fn(), useDeleteConnector: jest.fn(),
useUpdateConnectorState: jest.fn(),
})); }));
const clusterName = 'local'; const clusterName = 'local';
@ -42,6 +47,10 @@ describe('Connectors List', () => {
(useConnectors as jest.Mock).mockImplementation(() => ({ (useConnectors as jest.Mock).mockImplementation(() => ({
data: connectors, data: connectors,
})); }));
const restartConnector = jest.fn();
(useUpdateConnectorState as jest.Mock).mockImplementation(() => ({
mutateAsync: restartConnector,
}));
}); });
it('renders', async () => { it('renders', async () => {

View file

@ -51,9 +51,9 @@ const List = () => {
accessorKey: 'members', accessorKey: 'members',
}, },
{ {
id: ConsumerGroupOrdering.TOPIC_NUM,
header: 'Num Of Topics', header: 'Num Of Topics',
accessorKey: 'topics', accessorKey: 'topics',
enableSorting: false,
}, },
{ {
id: ConsumerGroupOrdering.MESSAGES_BEHIND, id: ConsumerGroupOrdering.MESSAGES_BEHIND,

View file

@ -1,4 +1,5 @@
import styled from 'styled-components'; import styled from 'styled-components';
import { Button } from 'components/common/Button/Button';
export const DiffWrapper = styled.div` export const DiffWrapper = styled.div`
align-items: stretch; align-items: stretch;
@ -81,3 +82,6 @@ export const DiffTile = styled.div`
export const DiffVersionsSelect = styled.div` export const DiffVersionsSelect = styled.div`
width: 0.625em; width: 0.625em;
`; `;
export const BackButton = styled(Button)`
margin: 10px 9px;
`;

View file

@ -20,6 +20,7 @@ import useAppParams from 'lib/hooks/useAppParams';
import PageHeading from 'components/common/PageHeading/PageHeading'; import PageHeading from 'components/common/PageHeading/PageHeading';
import * as S from './Diff.styled'; import * as S from './Diff.styled';
import { BackButton } from './Diff.styled';
export interface DiffProps { export interface DiffProps {
versions: SchemaSubject[]; versions: SchemaSubject[];
@ -77,6 +78,13 @@ const Diff: React.FC<DiffProps> = ({ versions, areVersionsFetched }) => {
backText="Schema Registry" backText="Schema Registry"
backTo={clusterSchemasPath(clusterName)} backTo={clusterSchemasPath(clusterName)}
/> />
<BackButton
buttonType="secondary"
buttonSize="S"
onClick={() => navigate(-1)}
>
Back
</BackButton>
<S.Section> <S.Section>
{areVersionsFetched ? ( {areVersionsFetched ? (
<S.DiffBox> <S.DiffBox>

View file

@ -3,6 +3,7 @@ import Diff, { DiffProps } from 'components/Schemas/Diff/Diff';
import { render, WithRoute } from 'lib/testHelpers'; import { render, WithRoute } from 'lib/testHelpers';
import { screen } from '@testing-library/react'; import { screen } from '@testing-library/react';
import { clusterSchemaComparePath } from 'lib/paths'; import { clusterSchemaComparePath } from 'lib/paths';
import userEvent from '@testing-library/user-event';
import { versions } from './fixtures'; import { versions } from './fixtures';
@ -142,4 +143,24 @@ describe('Diff', () => {
expect(select).toHaveTextContent(versions[0].version); expect(select).toHaveTextContent(versions[0].version);
}); });
}); });
describe('Back button', () => {
beforeEach(() => {
setupComponent({
areVersionsFetched: true,
versions,
});
});
it('back button is appear', () => {
const backButton = screen.getAllByRole('button', { name: 'Back' });
expect(backButton[0]).toBeInTheDocument();
});
it('click on back button', () => {
const backButton = screen.getAllByRole('button', { name: 'Back' });
userEvent.click(backButton[0]);
expect(screen.queryByRole('Back')).not.toBeInTheDocument();
});
});
}); });

View file

@ -4,11 +4,7 @@ import { CellContext } from '@tanstack/react-table';
import ClusterContext from 'components/contexts/ClusterContext'; import ClusterContext from 'components/contexts/ClusterContext';
import { ClusterNameRoute } from 'lib/paths'; import { ClusterNameRoute } from 'lib/paths';
import useAppParams from 'lib/hooks/useAppParams'; import useAppParams from 'lib/hooks/useAppParams';
import { import { Dropdown, DropdownItemHint } from 'components/common/Dropdown';
Dropdown,
DropdownItem,
DropdownItemHint,
} from 'components/common/Dropdown';
import { import {
useDeleteTopic, useDeleteTopic,
useClearTopicMessages, useClearTopicMessages,
@ -55,7 +51,8 @@ const ActionsCell: React.FC<CellContext<Topic, unknown>> = ({ row }) => {
with DELETE policy with DELETE policy
</DropdownItemHint> </DropdownItemHint>
</ActionDropdownItem> </ActionDropdownItem>
<DropdownItem <ActionDropdownItem
disabled={!isTopicDeletionAllowed}
onClick={recreateTopic.mutateAsync} onClick={recreateTopic.mutateAsync}
confirm={ confirm={
<> <>
@ -63,9 +60,14 @@ const ActionsCell: React.FC<CellContext<Topic, unknown>> = ({ row }) => {
</> </>
} }
danger danger
permission={{
resource: ResourceType.TOPIC,
action: [Action.VIEW, Action.CREATE, Action.DELETE],
value: name,
}}
> >
Recreate Topic Recreate Topic
</DropdownItem> </ActionDropdownItem>
<ActionDropdownItem <ActionDropdownItem
disabled={!isTopicDeletionAllowed} disabled={!isTopicDeletionAllowed}
onClick={() => deleteTopic.mutateAsync(name)} onClick={() => deleteTopic.mutateAsync(name)}

View file

@ -27,7 +27,7 @@ export interface AddEditFilterContainerProps {
inputDisplayNameDefaultValue?: string; inputDisplayNameDefaultValue?: string;
inputCodeDefaultValue?: string; inputCodeDefaultValue?: string;
isAdd?: boolean; isAdd?: boolean;
submitCallback?: (values: AddMessageFilters) => Promise<void>; submitCallback?: (values: AddMessageFilters) => void;
} }
const AddEditFilterContainer: React.FC<AddEditFilterContainerProps> = ({ const AddEditFilterContainer: React.FC<AddEditFilterContainerProps> = ({

View file

@ -29,8 +29,10 @@ const Message: React.FC<Props> = ({
timestampType, timestampType,
offset, offset,
key, key,
keySize,
partition, partition,
content, content,
valueSize,
headers, headers,
valueSerde, valueSerde,
keySerde, keySerde,
@ -138,6 +140,10 @@ const Message: React.FC<Props> = ({
headers={headers} headers={headers}
timestamp={timestamp} timestamp={timestamp}
timestampType={timestampType} timestampType={timestampType}
keySize={keySize}
contentSize={valueSize}
keySerde={keySerde}
valueSerde={valueSerde}
/> />
)} )}
</> </>

View file

@ -3,7 +3,6 @@ import EditorViewer from 'components/common/EditorViewer/EditorViewer';
import BytesFormatted from 'components/common/BytesFormatted/BytesFormatted'; import BytesFormatted from 'components/common/BytesFormatted/BytesFormatted';
import { SchemaType, TopicMessageTimestampTypeEnum } from 'generated-sources'; import { SchemaType, TopicMessageTimestampTypeEnum } from 'generated-sources';
import { formatTimestamp } from 'lib/dateTimeHelpers'; import { formatTimestamp } from 'lib/dateTimeHelpers';
import { useSearchParams } from 'react-router-dom';
import * as S from './MessageContent.styled'; import * as S from './MessageContent.styled';
@ -15,6 +14,10 @@ export interface MessageContentProps {
headers?: { [key: string]: string | undefined }; headers?: { [key: string]: string | undefined };
timestamp?: Date; timestamp?: Date;
timestampType?: TopicMessageTimestampTypeEnum; timestampType?: TopicMessageTimestampTypeEnum;
keySize?: number;
contentSize?: number;
keySerde?: string;
valueSerde?: string;
} }
const MessageContent: React.FC<MessageContentProps> = ({ const MessageContent: React.FC<MessageContentProps> = ({
@ -23,12 +26,12 @@ const MessageContent: React.FC<MessageContentProps> = ({
headers, headers,
timestamp, timestamp,
timestampType, timestampType,
keySize,
contentSize,
keySerde,
valueSerde,
}) => { }) => {
const [activeTab, setActiveTab] = React.useState<Tab>('content'); const [activeTab, setActiveTab] = React.useState<Tab>('content');
const [searchParams] = useSearchParams();
const keyFormat = searchParams.get('keySerde') || '';
const valueFormat = searchParams.get('valueSerde') || '';
const activeTabContent = () => { const activeTabContent = () => {
switch (activeTab) { switch (activeTab) {
case 'content': case 'content':
@ -54,8 +57,7 @@ const MessageContent: React.FC<MessageContentProps> = ({
e.preventDefault(); e.preventDefault();
setActiveTab('headers'); setActiveTab('headers');
}; };
const keySize = new TextEncoder().encode(messageKey).length;
const contentSize = new TextEncoder().encode(messageContent).length;
const contentType = const contentType =
messageContent && messageContent.trim().startsWith('{') messageContent && messageContent.trim().startsWith('{')
? SchemaType.JSON ? SchemaType.JSON
@ -107,7 +109,7 @@ const MessageContent: React.FC<MessageContentProps> = ({
<S.Metadata> <S.Metadata>
<S.MetadataLabel>Key Serde</S.MetadataLabel> <S.MetadataLabel>Key Serde</S.MetadataLabel>
<span> <span>
<S.MetadataValue>{keyFormat}</S.MetadataValue> <S.MetadataValue>{keySerde}</S.MetadataValue>
<S.MetadataMeta> <S.MetadataMeta>
Size: <BytesFormatted value={keySize} /> Size: <BytesFormatted value={keySize} />
</S.MetadataMeta> </S.MetadataMeta>
@ -117,7 +119,7 @@ const MessageContent: React.FC<MessageContentProps> = ({
<S.Metadata> <S.Metadata>
<S.MetadataLabel>Value Serde</S.MetadataLabel> <S.MetadataLabel>Value Serde</S.MetadataLabel>
<span> <span>
<S.MetadataValue>{valueFormat}</S.MetadataValue> <S.MetadataValue>{valueSerde}</S.MetadataValue>
<S.MetadataMeta> <S.MetadataMeta>
Size: <BytesFormatted value={contentSize} /> Size: <BytesFormatted value={contentSize} />
</S.MetadataMeta> </S.MetadataMeta>

View file

@ -20,6 +20,8 @@ const setupWrapper = (props?: Partial<MessageContentProps>) => {
headers={{ header: 'test' }} headers={{ header: 'test' }}
timestamp={new Date(0)} timestamp={new Date(0)}
timestampType={TopicMessageTimestampTypeEnum.CREATE_TIME} timestampType={TopicMessageTimestampTypeEnum.CREATE_TIME}
keySerde="SchemaRegistry"
valueSerde="Avro"
{...props} {...props}
/> />
</tbody> </tbody>
@ -27,42 +29,20 @@ const setupWrapper = (props?: Partial<MessageContentProps>) => {
); );
}; };
const proto =
'syntax = "proto3";\npackage com.provectus;\n\nmessage TestProtoRecord {\n string f1 = 1;\n int32 f2 = 2;\n}\n';
global.TextEncoder = TextEncoder; global.TextEncoder = TextEncoder;
const searchParamsContentAVRO = new URLSearchParams({
keySerde: 'SchemaRegistry',
valueSerde: 'AVRO',
limit: '100',
});
const searchParamsContentJSON = new URLSearchParams({
keySerde: 'SchemaRegistry',
valueSerde: 'JSON',
limit: '100',
});
const searchParamsContentPROTOBUF = new URLSearchParams({
keySerde: 'SchemaRegistry',
valueSerde: 'PROTOBUF',
limit: '100',
});
describe('MessageContent screen', () => { describe('MessageContent screen', () => {
beforeEach(() => { beforeEach(() => {
render(setupWrapper(), { render(setupWrapper());
initialEntries: [`/messages?${searchParamsContentAVRO}`],
});
}); });
describe('renders', () => { describe('Checking keySerde and valueSerde', () => {
it('key format in document', () => { it('keySerde in document', () => {
expect(screen.getByText('SchemaRegistry')).toBeInTheDocument(); expect(screen.getByText('SchemaRegistry')).toBeInTheDocument();
}); });
it('content format in document', () => { it('valueSerde in document', () => {
expect(screen.getByText('AVRO')).toBeInTheDocument(); expect(screen.getByText('Avro')).toBeInTheDocument();
}); });
}); });
@ -98,42 +78,3 @@ describe('MessageContent screen', () => {
}); });
}); });
}); });
describe('checking content type depend on message type', () => {
it('renders component with message having JSON type', () => {
render(
setupWrapper({
messageContent: '{"data": "test"}',
}),
{ initialEntries: [`/messages?${searchParamsContentJSON}`] }
);
expect(screen.getByText('JSON')).toBeInTheDocument();
});
it('renders component with message having AVRO type', () => {
render(
setupWrapper({
messageContent: '{"data": "test"}',
}),
{ initialEntries: [`/messages?${searchParamsContentAVRO}`] }
);
expect(screen.getByText('AVRO')).toBeInTheDocument();
});
it('renders component with message having PROTOBUF type', () => {
render(
setupWrapper({
messageContent: proto,
}),
{ initialEntries: [`/messages?${searchParamsContentPROTOBUF}`] }
);
expect(screen.getByText('PROTOBUF')).toBeInTheDocument();
});
it('renders component with message having no type which is equal to having PROTOBUF type', () => {
render(
setupWrapper({
messageContent: '',
}),
{ initialEntries: [`/messages?${searchParamsContentPROTOBUF}`] }
);
expect(screen.getByText('PROTOBUF')).toBeInTheDocument();
});
});

View file

@ -8,15 +8,29 @@ export const Wrapper = styled.div`
export const Columns = styled.div` export const Columns = styled.div`
margin: -0.75rem; margin: -0.75rem;
margin-bottom: 0.75rem; margin-bottom: 0.75rem;
display: flex;
flex-direction: column;
padding: 0.75rem;
gap: 8px;
@media screen and (min-width: 769px) { @media screen and (min-width: 769px) {
display: flex; display: flex;
} }
`; `;
export const Flex = styled.div`
export const Column = styled.div` display: flex;
flex-basis: 0; flex-direction: row;
flex-grow: 1; gap: 8px;
flex-shrink: 1; @media screen and (max-width: 1200px) {
padding: 0.75rem; flex-direction: column;
}
`;
export const FlexItem = styled.div`
width: 18rem;
@media screen and (max-width: 1450px) {
width: 50%;
}
@media screen and (max-width: 1200px) {
width: 100%;
}
`; `;

View file

@ -4,6 +4,7 @@ import { RouteParamsClusterTopic } from 'lib/paths';
import { Button } from 'components/common/Button/Button'; import { Button } from 'components/common/Button/Button';
import Editor from 'components/common/Editor/Editor'; import Editor from 'components/common/Editor/Editor';
import Select, { SelectOption } from 'components/common/Select/Select'; import Select, { SelectOption } from 'components/common/Select/Select';
import Switch from 'components/common/Switch/Switch';
import useAppParams from 'lib/hooks/useAppParams'; import useAppParams from 'lib/hooks/useAppParams';
import { showAlert } from 'lib/errorHandling'; import { showAlert } from 'lib/errorHandling';
import { useSendMessage, useTopicDetails } from 'lib/hooks/api/topics'; import { useSendMessage, useTopicDetails } from 'lib/hooks/api/topics';
@ -26,9 +27,12 @@ interface FormType {
partition: number; partition: number;
keySerde: string; keySerde: string;
valueSerde: string; valueSerde: string;
keepContents: boolean;
} }
const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => { const SendMessage: React.FC<{ closeSidebar: () => void }> = ({
closeSidebar,
}) => {
const { clusterName, topicName } = useAppParams<RouteParamsClusterTopic>(); const { clusterName, topicName } = useAppParams<RouteParamsClusterTopic>();
const { data: topic } = useTopicDetails({ clusterName, topicName }); const { data: topic } = useTopicDetails({ clusterName, topicName });
const { data: serdes = {} } = useSerdes({ const { data: serdes = {} } = useSerdes({
@ -47,11 +51,13 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
handleSubmit, handleSubmit,
formState: { isSubmitting }, formState: { isSubmitting },
control, control,
setValue,
} = useForm<FormType>({ } = useForm<FormType>({
mode: 'onChange', mode: 'onChange',
defaultValues: { defaultValues: {
...defaultValues, ...defaultValues,
partition: Number(partitionOptions[0].value), partition: Number(partitionOptions[0].value),
keepContents: false,
}, },
}); });
@ -62,6 +68,7 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
content, content,
headers, headers,
partition, partition,
keepContents,
}: FormType) => { }: FormType) => {
let errors: string[] = []; let errors: string[] = [];
@ -110,7 +117,11 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
keySerde, keySerde,
valueSerde, valueSerde,
}); });
onSubmit(); if (!keepContents) {
setValue('key', '');
setValue('content', '');
closeSidebar();
}
} catch (e) { } catch (e) {
// do nothing // do nothing
} }
@ -120,7 +131,7 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
<S.Wrapper> <S.Wrapper>
<form onSubmit={handleSubmit(submit)}> <form onSubmit={handleSubmit(submit)}>
<S.Columns> <S.Columns>
<S.Column> <S.FlexItem>
<InputLabel>Partition</InputLabel> <InputLabel>Partition</InputLabel>
<Controller <Controller
control={control} control={control}
@ -137,8 +148,9 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
/> />
)} )}
/> />
</S.Column> </S.FlexItem>
<S.Column> <S.Flex>
<S.FlexItem>
<InputLabel>Key Serde</InputLabel> <InputLabel>Key Serde</InputLabel>
<Controller <Controller
control={control} control={control}
@ -155,8 +167,8 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
/> />
)} )}
/> />
</S.Column> </S.FlexItem>
<S.Column> <S.FlexItem>
<InputLabel>Value Serde</InputLabel> <InputLabel>Value Serde</InputLabel>
<Controller <Controller
control={control} control={control}
@ -173,11 +185,21 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
/> />
)} )}
/> />
</S.Column> </S.FlexItem>
</S.Flex>
<div>
<Controller
control={control}
name="keepContents"
render={({ field: { name, onChange, value } }) => (
<Switch name={name} onChange={onChange} checked={value} />
)}
/>
<InputLabel>Keep contents</InputLabel>
</div>
</S.Columns> </S.Columns>
<S.Columns> <S.Columns>
<S.Column> <div>
<InputLabel>Key</InputLabel> <InputLabel>Key</InputLabel>
<Controller <Controller
control={control} control={control}
@ -191,8 +213,8 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
/> />
)} )}
/> />
</S.Column> </div>
<S.Column> <div>
<InputLabel>Value</InputLabel> <InputLabel>Value</InputLabel>
<Controller <Controller
control={control} control={control}
@ -206,10 +228,10 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
/> />
)} )}
/> />
</S.Column> </div>
</S.Columns> </S.Columns>
<S.Columns> <S.Columns>
<S.Column> <div>
<InputLabel>Headers</InputLabel> <InputLabel>Headers</InputLabel>
<Controller <Controller
control={control} control={control}
@ -224,7 +246,7 @@ const SendMessage: React.FC<{ onSubmit: () => void }> = ({ onSubmit }) => {
/> />
)} )}
/> />
</S.Column> </div>
</S.Columns> </S.Columns>
<Button <Button
buttonSize="M" buttonSize="M"

View file

@ -49,7 +49,7 @@ const renderComponent = async () => {
const path = clusterTopicPath(clusterName, topicName); const path = clusterTopicPath(clusterName, topicName);
await render( await render(
<WithRoute path={clusterTopicPath()}> <WithRoute path={clusterTopicPath()}>
<SendMessage onSubmit={mockOnSubmit} /> <SendMessage closeSidebar={mockOnSubmit} />
</WithRoute>, </WithRoute>,
{ initialEntries: [path] } { initialEntries: [path] }
); );

View file

@ -59,7 +59,7 @@ const Topic: React.FC = () => {
const deleteTopicHandler = async () => { const deleteTopicHandler = async () => {
await deleteTopic.mutateAsync(topicName); await deleteTopic.mutateAsync(topicName);
navigate('../..'); navigate(clusterTopicsPath(clusterName));
}; };
React.useEffect(() => { React.useEffect(() => {
@ -236,7 +236,7 @@ const Topic: React.FC = () => {
title="Produce Message" title="Produce Message"
> >
<Suspense fallback={<PageLoader />}> <Suspense fallback={<PageLoader />}>
<SendMessage onSubmit={closeSidebar} /> <SendMessage closeSidebar={closeSidebar} />
</Suspense> </Suspense>
</SlidingSidebar> </SlidingSidebar>
</> </>

View file

@ -10,6 +10,7 @@ import {
clusterTopicMessagesPath, clusterTopicMessagesPath,
clusterTopicPath, clusterTopicPath,
clusterTopicSettingsPath, clusterTopicSettingsPath,
clusterTopicsPath,
clusterTopicStatisticsPath, clusterTopicStatisticsPath,
getNonExactPath, getNonExactPath,
} from 'lib/paths'; } from 'lib/paths';
@ -179,7 +180,9 @@ describe('Details', () => {
name: 'Confirm', name: 'Confirm',
}); });
await userEvent.click(submitDeleteButton); await userEvent.click(submitDeleteButton);
expect(mockNavigate).toHaveBeenCalledWith('../..'); expect(mockNavigate).toHaveBeenCalledWith(
clusterTopicsPath(mockClusterName)
);
}); });
it('shows a confirmation popup on deleting topic messages', async () => { it('shows a confirmation popup on deleting topic messages', async () => {

View file

@ -1,52 +1,38 @@
import React from 'react'; import React from 'react';
import WarningIcon from 'components/common/Icons/WarningIcon'; import WarningIcon from 'components/common/Icons/WarningIcon';
import { gitCommitPath } from 'lib/paths'; import { gitCommitPath } from 'lib/paths';
import { useActuatorInfo } from 'lib/hooks/api/actuatorInfo';
import { BUILD_VERSION_PATTERN } from 'lib/constants';
import { useLatestVersion } from 'lib/hooks/api/latestVersion'; import { useLatestVersion } from 'lib/hooks/api/latestVersion';
import { formatTimestamp } from 'lib/dateTimeHelpers'; import { formatTimestamp } from 'lib/dateTimeHelpers';
import * as S from './Version.styled'; import * as S from './Version.styled';
import compareVersions from './compareVersions';
const Version: React.FC = () => { const Version: React.FC = () => {
const { data: actuatorInfo = {} } = useActuatorInfo();
const { data: latestVersionInfo = {} } = useLatestVersion(); const { data: latestVersionInfo = {} } = useLatestVersion();
const { buildTime, commitId, isLatestRelease } = latestVersionInfo.build;
const tag = actuatorInfo?.build?.version; const { versionTag } = latestVersionInfo?.latestRelease || '';
const commit = actuatorInfo?.git?.commit.id;
const { tag_name: latestTag } = latestVersionInfo;
const outdated = compareVersions(tag, latestTag);
const currentVersion = tag?.match(BUILD_VERSION_PATTERN)
? tag
: formatTimestamp(actuatorInfo?.build?.time);
if (!tag) return null;
return ( return (
<S.Wrapper> <S.Wrapper>
{!!outdated && ( {!isLatestRelease && (
<S.OutdatedWarning <S.OutdatedWarning
title={`Your app version is outdated. Current latest version is ${latestTag}`} title={`Your app version is outdated. Current latest version is ${versionTag}`}
> >
<WarningIcon /> <WarningIcon />
</S.OutdatedWarning> </S.OutdatedWarning>
)} )}
{commit && ( {commitId && (
<div> <div>
<S.CurrentCommitLink <S.CurrentCommitLink
title="Current commit" title="Current commit"
target="__blank" target="__blank"
href={gitCommitPath(commit)} href={gitCommitPath(commitId)}
> >
{commit} {commitId}
</S.CurrentCommitLink> </S.CurrentCommitLink>
</div> </div>
)} )}
<S.CurrentVersion>{currentVersion}</S.CurrentVersion> <S.CurrentVersion>{formatTimestamp(buildTime)}</S.CurrentVersion>
</S.Wrapper> </S.Wrapper>
); );
}; };

View file

@ -2,87 +2,40 @@ import React from 'react';
import { screen } from '@testing-library/dom'; import { screen } from '@testing-library/dom';
import Version from 'components/Version/Version'; import Version from 'components/Version/Version';
import { render } from 'lib/testHelpers'; import { render } from 'lib/testHelpers';
import { formatTimestamp } from 'lib/dateTimeHelpers';
import { useActuatorInfo } from 'lib/hooks/api/actuatorInfo';
import { useLatestVersion } from 'lib/hooks/api/latestVersion'; import { useLatestVersion } from 'lib/hooks/api/latestVersion';
import { actuatorInfoPayload } from 'lib/fixtures/actuatorInfo'; import {
import { latestVersionPayload } from 'lib/fixtures/latestVersion'; deprecatedVersionPayload,
latestVersionPayload,
} from 'lib/fixtures/latestVersion';
jest.mock('lib/hooks/api/actuatorInfo', () => ({
useActuatorInfo: jest.fn(),
}));
jest.mock('lib/hooks/api/latestVersion', () => ({ jest.mock('lib/hooks/api/latestVersion', () => ({
useLatestVersion: jest.fn(), useLatestVersion: jest.fn(),
})); }));
describe('Version Component', () => { describe('Version Component', () => {
const versionTag = 'v0.5.0'; const commitId = '96a577a';
const snapshotTag = 'test-SNAPSHOT';
const commitTag = 'befd3b328e2c9c7df57b0c5746561b2f7fee8813';
const actuatorVersionPayload = actuatorInfoPayload(versionTag);
const formattedTimestamp = formatTimestamp(actuatorVersionPayload.build.time);
describe('render latest version', () => {
beforeEach(() => { beforeEach(() => {
(useActuatorInfo as jest.Mock).mockImplementation(() => ({
data: actuatorVersionPayload,
}));
(useLatestVersion as jest.Mock).mockImplementation(() => ({ (useLatestVersion as jest.Mock).mockImplementation(() => ({
data: latestVersionPayload, data: latestVersionPayload,
})); }));
}); });
it('renders latest release version as current version', async () => {
render(<Version />);
expect(screen.getByText(commitId)).toBeInTheDocument();
});
describe('tag does not exist', () => { it('should not show warning icon if it is last release', async () => {
it('does not render component', async () => { render(<Version />);
(useActuatorInfo as jest.Mock).mockImplementation(() => ({ expect(screen.queryByRole('img')).not.toBeInTheDocument();
data: null,
}));
const { container } = render(<Version />);
expect(container.firstChild).toBeEmptyDOMElement();
}); });
}); });
describe('renders current version', () => { it('show warning icon if it is not last release', async () => {
it('renders release build version as current version', async () => { (useLatestVersion as jest.Mock).mockImplementation(() => ({
render(<Version />); data: deprecatedVersionPayload,
expect(screen.getByText(versionTag)).toBeInTheDocument();
});
it('renders formatted timestamp as current version when version is commit', async () => {
(useActuatorInfo as jest.Mock).mockImplementation(() => ({
data: actuatorInfoPayload(commitTag),
})); }));
render(<Version />); render(<Version />);
expect(screen.getByText(formattedTimestamp)).toBeInTheDocument(); expect(screen.getByRole('img')).toBeInTheDocument();
});
it('renders formatted timestamp as current version when version contains -SNAPSHOT', async () => {
(useActuatorInfo as jest.Mock).mockImplementation(() => ({
data: actuatorInfoPayload(snapshotTag),
}));
render(<Version />);
expect(screen.getByText(formattedTimestamp)).toBeInTheDocument();
});
});
describe('outdated build version', () => {
it('renders warning message', async () => {
(useActuatorInfo as jest.Mock).mockImplementation(() => ({
data: actuatorInfoPayload('v0.3.0'),
}));
render(<Version />);
expect(
screen.getByTitle(
`Your app version is outdated. Current latest version is ${latestVersionPayload.tag_name}`
)
).toBeInTheDocument();
});
});
describe('current commit id with link', () => {
it('renders', async () => {
render(<Version />);
expect(
screen.getByText(actuatorVersionPayload.git.commit.id)
).toBeInTheDocument();
});
}); });
}); });

View file

@ -70,7 +70,7 @@ export const DropdownButton = styled.button`
`; `;
export const DangerItem = styled.div` export const DangerItem = styled.div`
color: ${({ theme: { dropdown } }) => dropdown.item.color.normal}; color: ${({ theme: { dropdown } }) => dropdown.item.color.danger};
`; `;
export const DropdownItemHint = styled.div` export const DropdownItemHint = styled.div`

View file

@ -13,6 +13,7 @@ const WarningIcon: React.FC = () => {
return ( return (
<WarningIconContainer> <WarningIconContainer>
<svg <svg
role="img"
width="14" width="14"
height="13" height="13"
viewBox="0 0 14 13" viewBox="0 0 14 13"

View file

@ -6,7 +6,7 @@ export const Wrapper = styled.div<{ $open?: boolean }>(
position: fixed; position: fixed;
top: ${theme.layout.navBarHeight}; top: ${theme.layout.navBarHeight};
bottom: 0; bottom: 0;
width: 60vw; width: 37vw;
right: calc(${$open ? '0px' : theme.layout.rightSidebarWidth} * -1); right: calc(${$open ? '0px' : theme.layout.rightSidebarWidth} * -1);
box-shadow: -1px 0px 10px 0px rgba(0, 0, 0, 0.2); box-shadow: -1px 0px 10px 0px rgba(0, 0, 0, 0.2);
transition: right 0.3s linear; transition: right 0.3s linear;

View file

@ -1,12 +0,0 @@
export const actuatorInfoPayload = (
version = 'befd3b328e2c9c7df57b0c5746561b2f7fee8813'
) => ({
git: { commit: { id: 'befd3b3' } },
build: {
artifact: 'kafka-ui-api',
name: 'kafka-ui-api',
time: '2022-09-15T09:52:21.753Z',
version,
group: 'com.provectus',
},
});

View file

@ -1,3 +1,16 @@
export const latestVersionPayload = { export const deprecatedVersionPayload = {
tag_name: 'v0.4.0', build: {
buildTime: '2023-04-14T09:47:35.463Z',
commitId: '96a577a',
isLatestRelease: false,
version: '96a577a98c6069376c5d22ed49cffd3739f1bbdc',
},
};
export const latestVersionPayload = {
build: {
buildTime: '2023-04-14T09:47:35.463Z',
commitId: '96a577a',
isLatestRelease: true,
version: '96a577a98c6069376c5d22ed49cffd3739f1bbdc',
},
}; };

View file

@ -1,17 +0,0 @@
import fetchMock from 'fetch-mock';
import * as hooks from 'lib/hooks/api/actuatorInfo';
import { expectQueryWorks, renderQueryHook } from 'lib/testHelpers';
import { actuatorInfoPayload } from 'lib/fixtures/actuatorInfo';
const actuatorInfoPath = '/actuator/info';
describe('Actuator info hooks', () => {
beforeEach(() => fetchMock.restore());
describe('useActuatorInfo', () => {
it('returns the correct data', async () => {
const mock = fetchMock.getOnce(actuatorInfoPath, actuatorInfoPayload());
const { result } = renderQueryHook(() => hooks.useActuatorInfo());
await expectQueryWorks(mock, result);
});
});
});

Some files were not shown because too many files have changed in this diff Show more