Compare commits
70 commits
issues/408
...
master
Author | SHA1 | Date | |
---|---|---|---|
![]() |
83b5a60cc0 | ||
![]() |
3dc4446321 | ||
![]() |
53a6553765 | ||
![]() |
fc97dfa874 | ||
![]() |
68f08a0c9b | ||
![]() |
cc12814a95 | ||
![]() |
5d5358010b | ||
![]() |
de2f06ccf8 | ||
![]() |
ff106a2061 | ||
![]() |
c00cb320cd | ||
![]() |
8a1e9ad8e8 | ||
![]() |
39bb860f8e | ||
![]() |
f66d234d83 | ||
![]() |
68a7268f8b | ||
![]() |
aca3d25dc8 | ||
![]() |
0616883fee | ||
![]() |
59584ed369 | ||
![]() |
bbb739af92 | ||
![]() |
145bf07b5d | ||
![]() |
ceb821acdf | ||
![]() |
d2b0cc51e3 | ||
![]() |
9e7bc02c8a | ||
![]() |
2836b2f5d2 | ||
![]() |
a47848f809 | ||
![]() |
5c9fb994a4 | ||
![]() |
14efe9da1e | ||
![]() |
6676747606 | ||
![]() |
b0583a3ca7 | ||
![]() |
4ec7975b2e | ||
![]() |
c05abc1e0a | ||
![]() |
729ca79581 | ||
![]() |
80024c8758 | ||
![]() |
0d6f293ab9 | ||
![]() |
8f2a29d15d | ||
![]() |
552691fc5d | ||
![]() |
342b534ac9 | ||
![]() |
2051f6f653 | ||
![]() |
b2b02a5d60 | ||
![]() |
d7eb3ba99e | ||
![]() |
7de883d3ab | ||
![]() |
4519d9a48c | ||
![]() |
cca2c96997 | ||
![]() |
844eb17d7a | ||
![]() |
37a6e62684 | ||
![]() |
4f211b39ba | ||
![]() |
8d35761b8d | ||
![]() |
b12a0634a0 | ||
![]() |
8d402798c5 | ||
![]() |
ed9f91fd8a | ||
![]() |
d2a5acc82d | ||
![]() |
7a82079471 | ||
![]() |
9acbf2b681 | ||
![]() |
5f89e3b97e | ||
![]() |
1df8625fc8 | ||
![]() |
c8ad262d77 | ||
![]() |
bdbbdcccbe | ||
![]() |
3114509ebf | ||
![]() |
6224b12ed3 | ||
![]() |
78e53d7d93 | ||
![]() |
f9e89661d7 | ||
![]() |
2a61b97fab | ||
![]() |
b32ab01436 | ||
![]() |
fa9547b95a | ||
![]() |
d915de4fd8 | ||
![]() |
150fc21fb8 | ||
![]() |
ba18f3b042 | ||
![]() |
ac09efcd34 | ||
![]() |
333eae2475 | ||
![]() |
69ebd3d52b | ||
![]() |
6a40146fb1 |
150 changed files with 4300 additions and 3264 deletions
2
.github/workflows/aws_publisher.yaml
vendored
2
.github/workflows/aws_publisher.yaml
vendored
|
@ -31,7 +31,7 @@ jobs:
|
|||
echo "Packer will be triggered in this dir $WORK_DIR"
|
||||
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_AMI_PUBLISH_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_AMI_PUBLISH_KEY_SECRET }}
|
||||
|
|
2
.github/workflows/branch-deploy.yml
vendored
2
.github/workflows/branch-deploy.yml
vendored
|
@ -45,7 +45,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/build-public-image.yml
vendored
2
.github/workflows/build-public-image.yml
vendored
|
@ -42,7 +42,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/cve.yaml
vendored
2
.github/workflows/cve.yaml
vendored
|
@ -55,7 +55,7 @@ jobs:
|
|||
cache-to: type=local,dest=/tmp/.buildx-cache
|
||||
|
||||
- name: Run CVE checks
|
||||
uses: aquasecurity/trivy-action@0.11.2
|
||||
uses: aquasecurity/trivy-action@0.12.0
|
||||
with:
|
||||
image-ref: "provectuslabs/kafka-ui:${{ steps.build.outputs.version }}"
|
||||
format: "table"
|
||||
|
|
2
.github/workflows/delete-public-image.yml
vendored
2
.github/workflows/delete-public-image.yml
vendored
|
@ -15,7 +15,7 @@ jobs:
|
|||
tag='${{ github.event.pull_request.number }}'
|
||||
echo "tag=${tag}" >> $GITHUB_OUTPUT
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/e2e-automation.yml
vendored
2
.github/workflows/e2e-automation.yml
vendored
|
@ -24,7 +24,7 @@ jobs:
|
|||
with:
|
||||
ref: ${{ github.sha }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/e2e-checks.yaml
vendored
2
.github/workflows/e2e-checks.yaml
vendored
|
@ -18,7 +18,7 @@ jobs:
|
|||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/e2e-weekly.yml
vendored
2
.github/workflows/e2e-weekly.yml
vendored
|
@ -11,7 +11,7 @@ jobs:
|
|||
with:
|
||||
ref: ${{ github.sha }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
10
.github/workflows/frontend.yaml
vendored
10
.github/workflows/frontend.yaml
vendored
|
@ -23,13 +23,13 @@ jobs:
|
|||
# Disabling shallow clone is recommended for improving relevancy of reporting
|
||||
fetch-depth: 0
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- uses: pnpm/action-setup@v2.2.4
|
||||
- uses: pnpm/action-setup@v2.4.0
|
||||
with:
|
||||
version: 7.4.0
|
||||
version: 8.6.12
|
||||
- name: Install node
|
||||
uses: actions/setup-node@v3.7.0
|
||||
uses: actions/setup-node@v3.8.1
|
||||
with:
|
||||
node-version: "16.15.0"
|
||||
node-version: "18.17.1"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: "./kafka-ui-react-app/pnpm-lock.yaml"
|
||||
- name: Install Node dependencies
|
||||
|
@ -49,7 +49,7 @@ jobs:
|
|||
cd kafka-ui-react-app/
|
||||
pnpm test:CI
|
||||
- name: SonarCloud Scan
|
||||
uses: workshur/sonarcloud-github-action@improved_basedir
|
||||
uses: sonarsource/sonarcloud-github-action@master
|
||||
with:
|
||||
projectBaseDir: ./kafka-ui-react-app
|
||||
args: -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} -Dsonar.pullrequest.branch=${{ github.head_ref }} -Dsonar.pullrequest.base=${{ github.base_ref }}
|
||||
|
|
2
.github/workflows/release.yaml
vendored
2
.github/workflows/release.yaml
vendored
|
@ -34,7 +34,7 @@ jobs:
|
|||
echo "version=${VERSION}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Upload files to a GitHub release
|
||||
uses: svenstaro/upload-release-action@2.6.1
|
||||
uses: svenstaro/upload-release-action@2.7.0
|
||||
with:
|
||||
repo_token: ${{ secrets.GITHUB_TOKEN }}
|
||||
file: kafka-ui-api/target/kafka-ui-api-${{ steps.build.outputs.version }}.jar
|
||||
|
|
|
@ -47,7 +47,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/terraform-deploy.yml
vendored
2
.github/workflows/terraform-deploy.yml
vendored
|
@ -26,7 +26,7 @@ jobs:
|
|||
echo "Terraform will be triggered in this dir $TF_DIR"
|
||||
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
|
@ -91,7 +91,7 @@ docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-u
|
|||
|
||||
Then access the web UI at [http://localhost:8080](http://localhost:8080)
|
||||
|
||||
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://docs.kafka-ui.provectus.io/configuration/quick-start#persistent-start)
|
||||
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://docs.kafka-ui.provectus.io/quick-start/persistent-start)
|
||||
|
||||
## Persistent installation
|
||||
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
#FROM azul/zulu-openjdk-alpine:17-jre-headless
|
||||
FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
|
||||
|
||||
RUN apk add --no-cache gcompat # need to make snappy codec work
|
||||
RUN apk add --no-cache \
|
||||
# snappy codec
|
||||
gcompat \
|
||||
# configuring timezones
|
||||
tzdata
|
||||
RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui
|
||||
|
||||
# creating folder for dynamic config usage (certificates uploads, etc)
|
||||
|
|
|
@ -81,6 +81,12 @@
|
|||
<groupId>io.confluent</groupId>
|
||||
<artifactId>kafka-json-schema-serializer</artifactId>
|
||||
<version>${confluent.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>commons-collections</groupId>
|
||||
<artifactId>commons-collections</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.confluent</groupId>
|
||||
|
@ -135,6 +141,11 @@
|
|||
<artifactId>commons-pool2</artifactId>
|
||||
<version>${apache.commons.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-collections4</artifactId>
|
||||
<version>4.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>testcontainers</artifactId>
|
||||
|
@ -238,8 +249,6 @@
|
|||
<groupId>org.springframework.security</groupId>
|
||||
<artifactId>spring-security-ldap</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.codehaus.groovy</groupId>
|
||||
<artifactId>groovy-jsr223</artifactId>
|
||||
|
@ -394,7 +403,7 @@
|
|||
<plugin>
|
||||
<groupId>pl.project13.maven</groupId>
|
||||
<artifactId>git-commit-id-plugin</artifactId>
|
||||
<version>4.0.0</version>
|
||||
<version>4.9.10</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>get-the-git-infos</id>
|
||||
|
|
|
@ -35,38 +35,28 @@ public class ClustersProperties {
|
|||
public static class Cluster {
|
||||
String name;
|
||||
String bootstrapServers;
|
||||
|
||||
TruststoreConfig ssl;
|
||||
|
||||
String schemaRegistry;
|
||||
SchemaRegistryAuth schemaRegistryAuth;
|
||||
KeystoreConfig schemaRegistrySsl;
|
||||
|
||||
String ksqldbServer;
|
||||
KsqldbServerAuth ksqldbServerAuth;
|
||||
KeystoreConfig ksqldbServerSsl;
|
||||
|
||||
List<ConnectCluster> kafkaConnect;
|
||||
|
||||
List<SerdeConfig> serde;
|
||||
String defaultKeySerde;
|
||||
String defaultValueSerde;
|
||||
|
||||
MetricsConfigData metrics;
|
||||
Map<String, Object> properties;
|
||||
boolean readOnly = false;
|
||||
Long pollingThrottleRate;
|
||||
|
||||
List<SerdeConfig> serde;
|
||||
String defaultKeySerde;
|
||||
String defaultValueSerde;
|
||||
List<Masking> masking;
|
||||
|
||||
Long pollingThrottleRate;
|
||||
TruststoreConfig ssl;
|
||||
AuditProperties audit;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class PollingProperties {
|
||||
Integer pollTimeoutMs;
|
||||
Integer partitionPollTimeout;
|
||||
Integer noDataEmptyPolls;
|
||||
Integer maxPageSize;
|
||||
Integer defaultPageSize;
|
||||
}
|
||||
|
@ -109,16 +99,6 @@ public class ClustersProperties {
|
|||
public static class TruststoreConfig {
|
||||
String truststoreLocation;
|
||||
String truststorePassword;
|
||||
boolean verifySsl = true;
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@ToString(exclude = {"keystorePassword"})
|
||||
public static class KeystoreConfig {
|
||||
String keystoreLocation;
|
||||
String keystorePassword;
|
||||
}
|
||||
|
||||
@Data
|
||||
|
@ -138,6 +118,15 @@ public class ClustersProperties {
|
|||
String password;
|
||||
}
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@ToString(exclude = {"keystorePassword"})
|
||||
public static class KeystoreConfig {
|
||||
String keystoreLocation;
|
||||
String keystorePassword;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Masking {
|
||||
Type type;
|
||||
|
@ -161,7 +150,13 @@ public class ClustersProperties {
|
|||
Integer auditTopicsPartitions;
|
||||
Boolean topicAuditEnabled;
|
||||
Boolean consoleAuditEnabled;
|
||||
LogLevel level;
|
||||
Map<String, String> auditTopicProperties;
|
||||
|
||||
public enum LogLevel {
|
||||
ALL,
|
||||
ALTER_ONLY //default
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
|
@ -187,7 +182,6 @@ public class ClustersProperties {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
|
||||
@Nullable Map<String, Object> propertiesMap) {
|
||||
Map<String, Object> flattened = new HashMap<>();
|
||||
|
|
|
@ -7,8 +7,6 @@ import org.springframework.http.HttpMethod;
|
|||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.web.reactive.config.CorsRegistry;
|
||||
import org.springframework.web.reactive.config.WebFluxConfigurer;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilter;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.provectus.kafka.ui.config;
|
||||
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import java.beans.Transient;
|
||||
import javax.annotation.PostConstruct;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.provectus.kafka.ui.config.auth;
|
||||
|
||||
import java.util.Collection;
|
||||
import lombok.Value;
|
||||
|
||||
public record AuthenticatedUser(String principal, Collection<String> groups) {
|
||||
|
||||
|
|
|
@ -6,11 +6,13 @@ import lombok.extern.slf4j.Slf4j;
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
|
||||
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
|
||||
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;
|
||||
|
||||
@Configuration
|
||||
@EnableWebFluxSecurity
|
||||
|
@ -39,7 +41,9 @@ public class BasicAuthSecurityConfig extends AbstractAuthSecurityConfig {
|
|||
.authenticated()
|
||||
)
|
||||
.formLogin(spec -> spec.loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler))
|
||||
.logout(spec -> spec.logoutSuccessHandler(logoutSuccessHandler))
|
||||
.logout(spec -> spec
|
||||
.logoutSuccessHandler(logoutSuccessHandler)
|
||||
.requiresLogout(ServerWebExchangeMatchers.pathMatchers(HttpMethod.GET, "/logout")))
|
||||
.csrf(ServerHttpSecurity.CsrfSpec::disable)
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -99,6 +99,9 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
|
|||
final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
|
||||
final List<ClientRegistration> registrations =
|
||||
new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
|
||||
if (registrations.isEmpty()) {
|
||||
throw new IllegalArgumentException("OAuth2 authentication is enabled but no providers specified.");
|
||||
}
|
||||
return new InMemoryReactiveClientRegistrationRepository(registrations);
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.provectus.kafka.ui.config.auth;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import lombok.Value;
|
||||
import org.springframework.security.core.GrantedAuthority;
|
||||
import org.springframework.security.oauth2.core.user.OAuth2User;
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.provectus.kafka.ui.config.auth;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import lombok.Value;
|
||||
import org.springframework.security.core.GrantedAuthority;
|
||||
import org.springframework.security.oauth2.core.oidc.OidcIdToken;
|
||||
import org.springframework.security.oauth2.core.oidc.OidcUserInfo;
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
package com.provectus.kafka.ui.config.auth.condition;
|
||||
|
||||
import com.provectus.kafka.ui.service.rbac.AbstractProviderCondition;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.context.annotation.Condition;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
import org.springframework.core.type.AnnotatedTypeMetadata;
|
||||
|
||||
public class CognitoCondition extends AbstractProviderCondition implements Condition {
|
||||
@Override
|
||||
public boolean matches(final ConditionContext context, final AnnotatedTypeMetadata metadata) {
|
||||
public boolean matches(final ConditionContext context, final @NotNull AnnotatedTypeMetadata metadata) {
|
||||
return getRegisteredProvidersTypes(context.getEnvironment()).stream().anyMatch(a -> a.equalsIgnoreCase("cognito"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,12 +2,19 @@ package com.provectus.kafka.ui.controller;
|
|||
|
||||
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.ClustersStorage;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
|
||||
public abstract class AbstractController {
|
||||
|
||||
private ClustersStorage clustersStorage;
|
||||
protected ClustersStorage clustersStorage;
|
||||
protected AccessControlService accessControlService;
|
||||
protected AuditService auditService;
|
||||
|
||||
protected KafkaCluster getCluster(String name) {
|
||||
return clustersStorage.getClusterByName(name)
|
||||
|
@ -15,8 +22,26 @@ public abstract class AbstractController {
|
|||
String.format("Cluster with name '%s' not found", name)));
|
||||
}
|
||||
|
||||
protected Mono<Void> validateAccess(AccessContext context) {
|
||||
return accessControlService.validateAccess(context);
|
||||
}
|
||||
|
||||
protected void audit(AccessContext acxt, Signal<?> sig) {
|
||||
auditService.audit(acxt, sig);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setClustersStorage(ClustersStorage clustersStorage) {
|
||||
this.clustersStorage = clustersStorage;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAccessControlService(AccessControlService accessControlService) {
|
||||
this.accessControlService = accessControlService;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
public void setAuditService(AuditService auditService) {
|
||||
this.auditService = auditService;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,6 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -38,7 +37,7 @@ public class AccessController implements AuthorizationApi {
|
|||
.filter(role -> user.groups().contains(role.getName()))
|
||||
.map(role -> mapPermissions(role.getPermissions(), role.getClusters()))
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toList())
|
||||
.toList()
|
||||
)
|
||||
.switchIfEmpty(Mono.just(Collections.emptyList()));
|
||||
|
||||
|
@ -70,10 +69,10 @@ public class AccessController implements AuthorizationApi {
|
|||
.map(String::toUpperCase)
|
||||
.map(this::mapAction)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
.toList());
|
||||
return dto;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
|
|
@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
|
||||
import com.provectus.kafka.ui.service.acl.AclsService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Optional;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.common.resource.PatternType;
|
||||
|
@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
|
|||
public class AclsController extends AbstractController implements AclsApi {
|
||||
|
||||
private final AclsService aclsService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
|
||||
|
@ -41,11 +37,11 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(kafkaAclDto)
|
||||
.map(ClusterMapper::toAclBinding)
|
||||
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -58,11 +54,11 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("deleteAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(kafkaAclDto)
|
||||
.map(ClusterMapper::toAclBinding)
|
||||
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -88,12 +84,12 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
|
||||
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
Mono.just(
|
||||
ResponseEntity.ok(
|
||||
aclsService.listAcls(getCluster(clusterName), filter)
|
||||
.map(ClusterMapper::toKafkaAclDto)))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,11 +100,11 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("getAclAsCsv")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
aclsService.getAclAsCsvString(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok)
|
||||
.flatMap(Mono::just)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -120,10 +116,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("syncAclsCsv")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(csvMono)
|
||||
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -137,10 +133,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createConsumerAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(createConsumerAclDto)
|
||||
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -154,10 +150,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createProducerAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(createProducerAclDto)
|
||||
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -171,10 +167,10 @@ public class AclsController extends AbstractController implements AclsApi {
|
|||
.operationName("createStreamAppAcl")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(createStreamAppAclDto)
|
||||
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,8 +15,6 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.ApplicationInfoService;
|
||||
import com.provectus.kafka.ui.service.KafkaClusterFactory;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import com.provectus.kafka.ui.util.ApplicationRestarter;
|
||||
import com.provectus.kafka.ui.util.DynamicConfigOperations;
|
||||
import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
|
||||
|
@ -39,7 +37,7 @@ import reactor.util.function.Tuples;
|
|||
@Slf4j
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
public class ApplicationConfigController implements ApplicationConfigApi {
|
||||
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {
|
||||
|
||||
private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
|
||||
|
||||
|
@ -51,12 +49,10 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
|
||||
}
|
||||
|
||||
private final AccessControlService accessControlService;
|
||||
private final DynamicConfigOperations dynamicConfigOperations;
|
||||
private final ApplicationRestarter restarter;
|
||||
private final KafkaClusterFactory kafkaClusterFactory;
|
||||
private final ApplicationInfoService applicationInfoService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
|
||||
|
@ -69,12 +65,12 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(VIEW)
|
||||
.operationName("getCurrentConfig")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
|
||||
new ApplicationConfigDTO()
|
||||
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
|
||||
)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -84,14 +80,15 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(EDIT)
|
||||
.operationName("restartWithConfig")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(restartRequestDto)
|
||||
.<ResponseEntity<Void>>map(dto -> {
|
||||
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
|
||||
restarter.requestRestart();
|
||||
return ResponseEntity.ok().build();
|
||||
.doOnNext(restartDto -> {
|
||||
var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties());
|
||||
dynamicConfigOperations.persist(newConfig);
|
||||
})
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.doOnSuccess(dto -> restarter.requestRestart())
|
||||
.map(dto -> ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -101,13 +98,13 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(EDIT)
|
||||
.operationName("uploadConfigRelatedFile")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(fileFlux.single())
|
||||
.flatMap(file ->
|
||||
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
|
||||
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
|
||||
.map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -117,16 +114,16 @@ public class ApplicationConfigController implements ApplicationConfigApi {
|
|||
.applicationConfigActions(EDIT)
|
||||
.operationName("validateConfig")
|
||||
.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(configDto)
|
||||
.flatMap(config -> {
|
||||
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
|
||||
ClustersProperties clustersProperties = propertiesStructure.getKafka();
|
||||
PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties());
|
||||
ClustersProperties clustersProperties = newConfig.getKafka();
|
||||
return validateClustersConfig(clustersProperties)
|
||||
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
|
||||
})
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
|
||||
|
|
|
@ -36,10 +36,10 @@ public class AuthController {
|
|||
+ " <meta name=\"description\" content=\"\">\n"
|
||||
+ " <meta name=\"author\" content=\"\">\n"
|
||||
+ " <title>Please sign in</title>\n"
|
||||
+ " <link href=\"/static/css/bootstrap.min.css\" rel=\"stylesheet\" "
|
||||
+ " <link href=\"" + contextPath + "/static/css/bootstrap.min.css\" rel=\"stylesheet\" "
|
||||
+ "integrity=\"sha384-/Y6pD6FV/Vv2HJnA6t+vslU6fwYXjCFtcEpHbNJ0lyAFsXTsjBbfaDjzALeQsN6M\" "
|
||||
+ "crossorigin=\"anonymous\">\n"
|
||||
+ " <link href=\"/static/css/signin.css\" "
|
||||
+ " <link href=\"" + contextPath + "/static/css/signin.css\" "
|
||||
+ "rel=\"stylesheet\" crossorigin=\"anonymous\"/>\n"
|
||||
+ " </head>\n"
|
||||
+ " <body>\n"
|
||||
|
|
|
@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
|
||||
import com.provectus.kafka.ui.service.BrokerService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -28,12 +26,11 @@ import reactor.core.publisher.Mono;
|
|||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class BrokersController extends AbstractController implements BrokersApi {
|
||||
private static final String BROKER_ID = "brokerId";
|
||||
|
||||
private final BrokerService brokerService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
|
||||
private final AuditService auditService;
|
||||
private final AccessControlService accessControlService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
|
||||
ServerWebExchange exchange) {
|
||||
|
@ -43,9 +40,9 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.build();
|
||||
|
||||
var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(job))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,14 +54,14 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("id", id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
brokerService.getBrokerMetrics(getCluster(clusterName), id)
|
||||
.map(clusterMapper::toBrokerMetrics)
|
||||
.map(ResponseEntity::ok)
|
||||
.onErrorReturn(ResponseEntity.notFound().build())
|
||||
)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,10 +77,10 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.operationParams(Map.of("brokerIds", brokerIds))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(
|
||||
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -94,14 +91,14 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW)
|
||||
.operationName("getBrokerConfig")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
ResponseEntity.ok(
|
||||
brokerService.getBrokerConfig(getCluster(clusterName), id)
|
||||
.map(clusterMapper::toBrokerConfig))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -113,14 +110,14 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
|
||||
.operationName("updateBrokerTopicPartitionLogDir")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
brokerLogdir
|
||||
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,14 +130,14 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
|
||||
.operationName("updateBrokerConfigByName")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
brokerConfig
|
||||
.flatMap(bci -> brokerService.updateBrokerConfigByName(
|
||||
getCluster(clusterName), id, name, bci.getValue()))
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,6 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
|
|||
import com.provectus.kafka.ui.model.ClusterStatsDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.ClusterService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
@ -21,8 +19,6 @@ import reactor.core.publisher.Mono;
|
|||
@Slf4j
|
||||
public class ClustersController extends AbstractController implements ClustersApi {
|
||||
private final ClusterService clusterService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
|
||||
|
@ -40,13 +36,13 @@ public class ClustersController extends AbstractController implements ClustersAp
|
|||
.operationName("getClusterMetrics")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
clusterService.getClusterMetrics(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok)
|
||||
.onErrorReturn(ResponseEntity.notFound().build())
|
||||
)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,13 +53,13 @@ public class ClustersController extends AbstractController implements ClustersAp
|
|||
.operationName("getClusterStats")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
clusterService.getClusterStats(getCluster(clusterName))
|
||||
.map(ResponseEntity::ok)
|
||||
.onErrorReturn(ResponseEntity.notFound().build())
|
||||
)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,8 +71,8 @@ public class ClustersController extends AbstractController implements ClustersAp
|
|||
.operationName("updateClusterInfo")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,12 +19,9 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
|
|||
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
|
||||
import com.provectus.kafka.ui.service.ConsumerGroupService;
|
||||
import com.provectus.kafka.ui.service.OffsetsResetService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
@ -42,8 +39,6 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
|
||||
private final ConsumerGroupService consumerGroupService;
|
||||
private final OffsetsResetService offsetsResetService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Value("${consumer.groups.page.size:25}")
|
||||
private int defaultConsumerGroupsPageSize;
|
||||
|
@ -59,9 +54,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.operationName("deleteConsumerGroup")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -76,11 +71,11 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.operationName("getConsumerGroup")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
|
||||
.map(ConsumerGroupMapper::toDetailsDto)
|
||||
.map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,9 +99,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.map(ResponseEntity::ok)
|
||||
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(job)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -125,7 +120,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.operationName("getConsumerGroupsPage")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
consumerGroupService.getConsumerGroupsPage(
|
||||
getCluster(clusterName),
|
||||
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
|
||||
|
@ -136,7 +131,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
)
|
||||
.map(this::convertPage)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -191,9 +186,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
}
|
||||
};
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(mono.get())
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}).thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -204,7 +199,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.consumerGroups(consumerGroupConsumerGroupsPage.consumerGroups()
|
||||
.stream()
|
||||
.map(ConsumerGroupMapper::toDto)
|
||||
.collect(Collectors.toList()));
|
||||
.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@ import com.provectus.kafka.ui.model.TaskDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
|
||||
import com.provectus.kafka.ui.service.KafkaConnectService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -38,10 +36,9 @@ import reactor.core.publisher.Mono;
|
|||
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
|
||||
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
|
||||
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
|
||||
private static final String CONNECTOR_NAME = "connectorName";
|
||||
|
||||
private final KafkaConnectService kafkaConnectService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
|
||||
|
@ -64,9 +61,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnectors")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -81,10 +78,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("createConnector")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,10 +97,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnector")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,13 +113,13 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
||||
.operationName("deleteConnector")
|
||||
.operationParams(Map.of("connectorName", connectName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
|
||||
|
@ -150,7 +147,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.sort(comparator);
|
||||
|
||||
return Mono.just(ResponseEntity.ok(job))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -166,11 +163,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnectorConfig")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -184,14 +181,14 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
||||
.operationName("setConnectorConfig")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
|
||||
.map(ResponseEntity::ok))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,14 +208,14 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(connectActions)
|
||||
.operationName("updateConnectorState")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -231,14 +228,14 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW)
|
||||
.operationName("getConnectorTasks")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
ResponseEntity
|
||||
.ok(kafkaConnectService
|
||||
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -251,14 +248,14 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
|
||||
.operationName("restartConnectorTask")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
kafkaConnectService
|
||||
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -272,11 +269,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.operationName("getConnectorPlugins")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
Mono.just(
|
||||
ResponseEntity.ok(
|
||||
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -9,9 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
|
|||
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
|
|||
public class KsqlController extends AbstractController implements KsqlApi {
|
||||
|
||||
private final KsqlServiceV2 ksqlServiceV2;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
|
||||
|
@ -44,13 +40,13 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("executeKsql")
|
||||
.operationParams(command)
|
||||
.build();
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
new KsqlCommandV2ResponseDTO().pipeId(
|
||||
ksqlServiceV2.registerCommand(
|
||||
getCluster(clusterName),
|
||||
command.getKsql(),
|
||||
Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
)
|
||||
.map(ResponseEntity::ok);
|
||||
|
@ -66,7 +62,7 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("openKsqlResponsePipe")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).thenReturn(
|
||||
return validateAccess(context).thenReturn(
|
||||
ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
|
||||
.map(table -> new KsqlResponseDTO()
|
||||
.table(
|
||||
|
@ -86,9 +82,9 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("listStreams")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -100,8 +96,8 @@ public class KsqlController extends AbstractController implements KsqlApi {
|
|||
.operationName("listTables")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,8 +24,7 @@ import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
|
|||
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
|
||||
import com.provectus.kafka.ui.service.DeserializationService;
|
||||
import com.provectus.kafka.ui.service.MessagesService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import com.provectus.kafka.ui.util.DynamicConfigOperations;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
@ -33,6 +32,7 @@ import javax.annotation.Nullable;
|
|||
import javax.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
|
@ -49,8 +49,7 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
|
||||
private final MessagesService messagesService;
|
||||
private final DeserializationService deserializationService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
private final DynamicConfigOperations dynamicConfigOperations;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> deleteTopicMessages(
|
||||
|
@ -63,13 +62,13 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
.topicActions(MESSAGES_DELETE)
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
|
||||
return validateAccess(context).<ResponseEntity<Void>>then(
|
||||
messagesService.deleteTopicMessages(
|
||||
getCluster(clusterName),
|
||||
topicName,
|
||||
Optional.ofNullable(partitions).orElse(List.of())
|
||||
).thenReturn(ResponseEntity.ok().build())
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -98,6 +97,10 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
.topicActions(MESSAGES_READ)
|
||||
.operationName("getTopicMessages");
|
||||
|
||||
if (StringUtils.isNoneEmpty(q) && MessageFilterTypeDTO.GROOVY_SCRIPT == filterQueryType) {
|
||||
dynamicConfigOperations.checkIfFilteringGroovyEnabled();
|
||||
}
|
||||
|
||||
if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
|
||||
contextBuilder.auditActions(AuditAction.VIEW);
|
||||
}
|
||||
|
@ -120,9 +123,9 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
);
|
||||
|
||||
var context = contextBuilder.build();
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(job)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -137,11 +140,11 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
.operationName("sendTopicMessages")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
createTopicMessage.flatMap(msg ->
|
||||
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
|
||||
).map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,7 +195,7 @@ public class MessagesController extends AbstractController implements MessagesAp
|
|||
? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
|
||||
: deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
Mono.just(dto)
|
||||
.subscribeOn(Schedulers.boundedElastic())
|
||||
.map(ResponseEntity::ok)
|
||||
|
|
|
@ -13,11 +13,8 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
|
||||
import com.provectus.kafka.ui.service.SchemaRegistryService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -38,8 +35,6 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
|
||||
|
||||
private final SchemaRegistryService schemaRegistryService;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
protected KafkaCluster getCluster(String clusterName) {
|
||||
|
@ -61,7 +56,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("checkSchemaCompatibility")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
newSchemaSubjectMono.flatMap(subjectDTO ->
|
||||
schemaRegistryService.checksSchemaCompatibility(
|
||||
getCluster(clusterName),
|
||||
|
@ -70,7 +65,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
))
|
||||
.map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -83,7 +78,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("createNewSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
newSchemaSubjectMono.flatMap(newSubject ->
|
||||
schemaRegistryService.registerNewSchema(
|
||||
getCluster(clusterName),
|
||||
|
@ -92,7 +87,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
)
|
||||
).map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,9 +100,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("deleteLatestSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -122,9 +117,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("deleteSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -139,9 +134,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("deleteSchemaByVersion")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -160,9 +155,9 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
|
||||
.map(kafkaSrMapper::toDto);
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(ResponseEntity.ok(schemas))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,11 +180,11 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("getLatestSchema")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
|
||||
.map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -203,12 +198,12 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationParams(Map.of("subject", subject, "version", version))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
schemaRegistryService.getSchemaSubjectByVersion(
|
||||
getCluster(clusterName), subject, version)
|
||||
.map(kafkaSrMapper::toDto)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -239,12 +234,12 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
List<String> subjectsToRender = filteredSubjects.stream()
|
||||
.skip(subjectToSkip)
|
||||
.limit(pageSize)
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
|
||||
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
|
||||
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
|
||||
}).map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -257,14 +252,14 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationName("updateGlobalSchemaCompatibilityLevel")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
compatibilityLevelMono
|
||||
.flatMap(compatibilityLevelDTO ->
|
||||
schemaRegistryService.updateGlobalSchemaCompatibility(
|
||||
getCluster(clusterName),
|
||||
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
|
||||
))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -280,7 +275,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
.operationParams(Map.of("subject", subject))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
compatibilityLevelMono
|
||||
.flatMap(compatibilityLevelDTO ->
|
||||
schemaRegistryService.updateSchemaCompatibility(
|
||||
|
@ -288,7 +283,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
subject,
|
||||
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
|
||||
))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
|
|
@ -22,13 +22,12 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
|
|||
import com.provectus.kafka.ui.model.TopicCreationDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
|
||||
import com.provectus.kafka.ui.model.TopicUpdateDTO;
|
||||
import com.provectus.kafka.ui.model.TopicsResponseDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.TopicsService;
|
||||
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
|
||||
import com.provectus.kafka.ui.service.audit.AuditService;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -53,8 +52,6 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
private final TopicsService topicsService;
|
||||
private final TopicAnalysisService topicAnalysisService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
private final AccessControlService accessControlService;
|
||||
private final AuditService auditService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<TopicDTO>> createTopic(
|
||||
|
@ -67,12 +64,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationParams(topicCreation)
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(topicsService.createTopic(getCluster(clusterName), topicCreation))
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
|
||||
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -86,11 +83,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("recreateTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService.recreateTopic(getCluster(clusterName), topicName)
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,11 +102,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationParams(Map.of("newTopicName", newTopicName))
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,11 +120,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("deleteTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(
|
||||
topicsService.deleteTopic(getCluster(clusterName), topicName)
|
||||
.thenReturn(ResponseEntity.ok().<Void>build())
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
|
||||
|
@ -142,15 +139,15 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("getTopicConfigs")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService.getTopicConfigs(getCluster(clusterName), topicName)
|
||||
.map(lst -> lst.stream()
|
||||
.map(InternalTopicConfig::from)
|
||||
.map(clusterMapper::toTopicConfig)
|
||||
.collect(toList()))
|
||||
.toList())
|
||||
.map(Flux::fromIterable)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -164,11 +161,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("getTopicDetails")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService.getTopicDetails(getCluster(clusterName), topicName)
|
||||
.map(clusterMapper::toTopicDetails)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,11 +208,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
return topicsService.loadTopics(getCluster(clusterName), topicsPage)
|
||||
.map(topicsToRender ->
|
||||
new TopicsResponseDTO()
|
||||
.topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
|
||||
.topics(topicsToRender.stream().map(clusterMapper::toTopic).toList())
|
||||
.pageCount(totalPages));
|
||||
})
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -230,12 +227,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("updateTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicsService
|
||||
.updateTopic(getCluster(clusterName), topicName, topicUpdate)
|
||||
.map(clusterMapper::toTopic)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -250,11 +247,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.topicActions(VIEW, EDIT)
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
partitionsIncrease.flatMap(partitions ->
|
||||
topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
|
||||
).map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -270,12 +267,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("changeReplicationFactor")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
replicationFactorChange
|
||||
.flatMap(rfc ->
|
||||
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> auditService.audit(context, sig));
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -288,9 +285,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("analyzeTopic")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context).then(
|
||||
return validateAccess(context).then(
|
||||
topicAnalysisService.analyze(getCluster(clusterName), topicName)
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build())
|
||||
);
|
||||
}
|
||||
|
@ -305,9 +302,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("cancelTopicAnalysis")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
|
||||
.doOnEach(sig -> auditService.audit(context, sig))
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.thenReturn(ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
|
@ -324,11 +321,39 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.operationName("getTopicAnalysis")
|
||||
.build();
|
||||
|
||||
return accessControlService.validateAccess(context)
|
||||
return validateAccess(context)
|
||||
.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
|
||||
.map(ResponseEntity::ok)
|
||||
.orElseGet(() -> ResponseEntity.notFound().build()))
|
||||
.doOnEach(sig -> auditService.audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
|
||||
String topicName,
|
||||
ServerWebExchange exchange) {
|
||||
var context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.topic(topicName)
|
||||
.topicActions(VIEW)
|
||||
.operationName("getActiveProducerStates")
|
||||
.build();
|
||||
|
||||
Comparator<TopicProducerStateDTO> ordering =
|
||||
Comparator.comparingInt(TopicProducerStateDTO::getPartition)
|
||||
.thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed());
|
||||
|
||||
Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
|
||||
.flatMapMany(statesMap ->
|
||||
Flux.fromStream(
|
||||
statesMap.entrySet().stream()
|
||||
.flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
|
||||
.sorted(ordering)));
|
||||
|
||||
return validateAccess(context)
|
||||
.thenReturn(states)
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
private Comparator<InternalTopic> getComparatorForTopic(
|
||||
|
|
|
@ -1,28 +1,22 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.time.Duration;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
public abstract class AbstractEmitter {
|
||||
abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
|
||||
private final MessagesProcessing messagesProcessing;
|
||||
protected final PollingSettings pollingSettings;
|
||||
private final PollingSettings pollingSettings;
|
||||
|
||||
protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
|
||||
this.messagesProcessing = messagesProcessing;
|
||||
this.pollingSettings = pollingSettings;
|
||||
}
|
||||
|
||||
protected PolledRecords poll(
|
||||
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
|
||||
return poll(sink, consumer, pollingSettings.getPollTimeout());
|
||||
}
|
||||
|
||||
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
|
||||
var records = consumer.pollEnhanced(timeout);
|
||||
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
|
||||
var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
|
||||
sendConsuming(sink, records);
|
||||
return records;
|
||||
}
|
||||
|
@ -31,9 +25,8 @@ public abstract class AbstractEmitter {
|
|||
return messagesProcessing.limitReached();
|
||||
}
|
||||
|
||||
protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
|
||||
ConsumerRecord<Bytes, Bytes> msg) {
|
||||
messagesProcessing.sendMsg(sink, msg);
|
||||
protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
|
||||
messagesProcessing.send(sink, records);
|
||||
}
|
||||
|
||||
protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
public class BackwardEmitter extends RangePollingEmitter {
|
||||
|
||||
public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
PollingSettings pollingSettings) {
|
||||
super(
|
||||
consumerSupplier,
|
||||
consumerPosition,
|
||||
messagesPerPage,
|
||||
new MessagesProcessing(
|
||||
deserializer,
|
||||
filter,
|
||||
false,
|
||||
messagesPerPage
|
||||
),
|
||||
pollingSettings
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
|
||||
SeekOperations seekOperations) {
|
||||
TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
if (prevRange.isEmpty()) {
|
||||
readToOffsets.putAll(seekOperations.getOffsetsForSeek());
|
||||
} else {
|
||||
readToOffsets.putAll(
|
||||
prevRange.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
|
||||
);
|
||||
}
|
||||
|
||||
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
|
||||
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
readToOffsets.forEach((tp, toOffset) -> {
|
||||
long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
|
||||
if (toOffset > tpStartOffset) {
|
||||
result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -1,126 +0,0 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class BackwardRecordEmitter
|
||||
extends AbstractEmitter
|
||||
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
private final ConsumerPosition consumerPosition;
|
||||
private final int messagesPerPage;
|
||||
|
||||
public BackwardRecordEmitter(
|
||||
Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
MessagesProcessing messagesProcessing,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
this.consumerPosition = consumerPosition;
|
||||
this.messagesPerPage = messagesPerPage;
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(FluxSink<TopicMessageEventDTO> sink) {
|
||||
log.debug("Starting backward polling for {}", consumerPosition);
|
||||
try (EnhancedConsumer consumer = consumerSupplier.get()) {
|
||||
sendPhase(sink, "Created consumer");
|
||||
|
||||
var seekOperations = SeekOperations.create(consumer, consumerPosition);
|
||||
var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
|
||||
readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
|
||||
|
||||
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
|
||||
log.debug("'Until' offsets for polling: {}", readUntilOffsets);
|
||||
|
||||
while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
|
||||
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
|
||||
if (sink.isCancelled()) {
|
||||
return; //fast return in case of sink cancellation
|
||||
}
|
||||
long beginOffset = seekOperations.getBeginOffsets().get(tp);
|
||||
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
|
||||
|
||||
partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
|
||||
.forEach(r -> sendMessage(sink, r));
|
||||
|
||||
if (beginOffset == readFromOffset) {
|
||||
// we fully read this partition -> removing it from polling iterations
|
||||
readUntilOffsets.remove(tp);
|
||||
} else {
|
||||
// updating 'to' offset for next polling iteration
|
||||
readUntilOffsets.put(tp, readFromOffset);
|
||||
}
|
||||
});
|
||||
if (readUntilOffsets.isEmpty()) {
|
||||
log.debug("begin reached after partitions poll iteration");
|
||||
} else if (sink.isCancelled()) {
|
||||
log.debug("sink is cancelled after partitions poll iteration");
|
||||
}
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
sink.complete();
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
|
||||
TopicPartition tp,
|
||||
long fromOffset,
|
||||
long toOffset,
|
||||
EnhancedConsumer consumer,
|
||||
FluxSink<TopicMessageEventDTO> sink
|
||||
) {
|
||||
consumer.assign(Collections.singleton(tp));
|
||||
consumer.seek(tp, fromOffset);
|
||||
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
|
||||
int desiredMsgsToPoll = (int) (toOffset - fromOffset);
|
||||
|
||||
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
|
||||
|
||||
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
|
||||
while (!sink.isCancelled()
|
||||
&& !sendLimitReached()
|
||||
&& recordsToSend.size() < desiredMsgsToPoll
|
||||
&& !emptyPolls.noDataEmptyPollsReached()) {
|
||||
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
|
||||
emptyPolls.count(polledRecords.count());
|
||||
|
||||
log.debug("{} records polled from {}", polledRecords.count(), tp);
|
||||
|
||||
var filteredRecords = polledRecords.records(tp).stream()
|
||||
.filter(r -> r.offset() < toOffset)
|
||||
.toList();
|
||||
|
||||
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
|
||||
// we already read all messages in target offsets interval
|
||||
break;
|
||||
}
|
||||
recordsToSend.addAll(filteredRecords);
|
||||
}
|
||||
log.debug("{} records to send", recordsToSend.size());
|
||||
Collections.reverse(recordsToSend);
|
||||
return recordsToSend;
|
||||
}
|
||||
}
|
|
@ -9,35 +9,37 @@ class ConsumingStats {
|
|||
private long bytes = 0;
|
||||
private int records = 0;
|
||||
private long elapsed = 0;
|
||||
private int filterApplyErrors = 0;
|
||||
|
||||
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
|
||||
PolledRecords polledRecords,
|
||||
int filterApplyErrors) {
|
||||
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
|
||||
bytes += polledRecords.bytes();
|
||||
this.records += polledRecords.count();
|
||||
this.elapsed += polledRecords.elapsed().toMillis();
|
||||
records += polledRecords.count();
|
||||
elapsed += polledRecords.elapsed().toMillis();
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
|
||||
.consuming(createConsumingStats(sink, filterApplyErrors))
|
||||
.consuming(createConsumingStats())
|
||||
);
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
|
||||
void incFilterApplyError() {
|
||||
filterApplyErrors++;
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.DONE)
|
||||
.consuming(createConsumingStats(sink, filterApplyErrors))
|
||||
.consuming(createConsumingStats())
|
||||
);
|
||||
}
|
||||
|
||||
private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
|
||||
int filterApplyErrors) {
|
||||
private TopicMessageConsumingDTO createConsumingStats() {
|
||||
return new TopicMessageConsumingDTO()
|
||||
.bytesConsumed(this.bytes)
|
||||
.elapsedMs(this.elapsed)
|
||||
.isCancelled(sink.isCancelled())
|
||||
.bytesConsumed(bytes)
|
||||
.elapsedMs(elapsed)
|
||||
.isCancelled(false)
|
||||
.filterApplyErrors(filterApplyErrors)
|
||||
.messagesConsumed(this.records);
|
||||
.messagesConsumed(records);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,28 +0,0 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
|
||||
// In some situations it is hard to say whether records range (between two offsets) was fully polled.
|
||||
// This happens when we have holes in records sequences that is usual case for compact topics or
|
||||
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
|
||||
// there is no guarantee that you will ever see record with offset Y.
|
||||
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
|
||||
public class EmptyPollsCounter {
|
||||
|
||||
private final int maxEmptyPolls;
|
||||
|
||||
private int emptyPolls = 0;
|
||||
|
||||
EmptyPollsCounter(int maxEmptyPolls) {
|
||||
this.maxEmptyPolls = maxEmptyPolls;
|
||||
}
|
||||
|
||||
public void count(int polledCount) {
|
||||
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
|
||||
}
|
||||
|
||||
public boolean noDataEmptyPollsReached() {
|
||||
return emptyPolls >= maxEmptyPolls;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
public class ForwardEmitter extends RangePollingEmitter {
|
||||
|
||||
public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
PollingSettings pollingSettings) {
|
||||
super(
|
||||
consumerSupplier,
|
||||
consumerPosition,
|
||||
messagesPerPage,
|
||||
new MessagesProcessing(
|
||||
deserializer,
|
||||
filter,
|
||||
true,
|
||||
messagesPerPage
|
||||
),
|
||||
pollingSettings
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
|
||||
SeekOperations seekOperations) {
|
||||
TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
if (prevRange.isEmpty()) {
|
||||
readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
|
||||
} else {
|
||||
readFromOffsets.putAll(
|
||||
prevRange.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().to()))
|
||||
);
|
||||
}
|
||||
|
||||
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readFromOffsets.size());
|
||||
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
|
||||
readFromOffsets.forEach((tp, fromOffset) -> {
|
||||
long tpEndOffset = seekOperations.getEndOffsets().get(tp);
|
||||
if (fromOffset < tpEndOffset) {
|
||||
result.put(tp, new FromToOffset(fromOffset, Math.min(tpEndOffset, fromOffset + msgsToPollPerPartition)));
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,64 +0,0 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class ForwardRecordEmitter
|
||||
extends AbstractEmitter
|
||||
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
private final ConsumerPosition position;
|
||||
|
||||
public ForwardRecordEmitter(
|
||||
Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition position,
|
||||
MessagesProcessing messagesProcessing,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
this.position = position;
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(FluxSink<TopicMessageEventDTO> sink) {
|
||||
log.debug("Starting forward polling for {}", position);
|
||||
try (EnhancedConsumer consumer = consumerSupplier.get()) {
|
||||
sendPhase(sink, "Assigning partitions");
|
||||
var seekOperations = SeekOperations.create(consumer, position);
|
||||
seekOperations.assignAndSeekNonEmptyPartitions();
|
||||
|
||||
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
|
||||
while (!sink.isCancelled()
|
||||
&& !sendLimitReached()
|
||||
&& !seekOperations.assignedPartitionsFullyPolled()
|
||||
&& !emptyPolls.noDataEmptyPollsReached()) {
|
||||
|
||||
sendPhase(sink, "Polling");
|
||||
var records = poll(sink, consumer);
|
||||
emptyPolls.count(records.count());
|
||||
|
||||
log.debug("{} records polled", records.count());
|
||||
|
||||
for (ConsumerRecord<Bytes, Bytes> msg : records) {
|
||||
sendMessage(sink, msg);
|
||||
}
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
sink.complete();
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,67 +1,75 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import static java.util.stream.Collectors.collectingAndThen;
|
||||
import static java.util.stream.Collectors.groupingBy;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Streams;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Predicate;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class MessagesProcessing {
|
||||
@RequiredArgsConstructor
|
||||
class MessagesProcessing {
|
||||
|
||||
private final ConsumingStats consumingStats = new ConsumingStats();
|
||||
private long sentMessages = 0;
|
||||
private int filterApplyErrors = 0;
|
||||
|
||||
private final ConsumerRecordDeserializer deserializer;
|
||||
private final Predicate<TopicMessageDTO> filter;
|
||||
private final boolean ascendingSortBeforeSend;
|
||||
private final @Nullable Integer limit;
|
||||
|
||||
public MessagesProcessing(ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
@Nullable Integer limit) {
|
||||
this.deserializer = deserializer;
|
||||
this.filter = filter;
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
boolean limitReached() {
|
||||
return limit != null && sentMessages >= limit;
|
||||
}
|
||||
|
||||
void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
|
||||
if (!sink.isCancelled() && !limitReached()) {
|
||||
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
|
||||
try {
|
||||
if (filter.test(topicMessage)) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
|
||||
.message(topicMessage)
|
||||
);
|
||||
sentMessages++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
filterApplyErrors++;
|
||||
log.trace("Error applying filter for message {}", topicMessage);
|
||||
}
|
||||
}
|
||||
void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
|
||||
sortForSending(polled, ascendingSortBeforeSend)
|
||||
.forEach(rec -> {
|
||||
if (!limitReached() && !sink.isCancelled()) {
|
||||
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
|
||||
try {
|
||||
if (filter.test(topicMessage)) {
|
||||
sink.next(
|
||||
new TopicMessageEventDTO()
|
||||
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
|
||||
.message(topicMessage)
|
||||
);
|
||||
sentMessages++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
consumingStats.incFilterApplyError();
|
||||
log.trace("Error applying filter for message {}", topicMessage);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
|
||||
if (!sink.isCancelled()) {
|
||||
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
|
||||
consumingStats.sendConsumingEvt(sink, polledRecords);
|
||||
}
|
||||
}
|
||||
|
||||
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
|
||||
if (!sink.isCancelled()) {
|
||||
consumingStats.sendFinishEvent(sink, filterApplyErrors);
|
||||
consumingStats.sendFinishEvent(sink);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,4 +83,30 @@ public class MessagesProcessing {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
|
||||
boolean asc) {
|
||||
Comparator<ConsumerRecord> offsetComparator = asc
|
||||
? Comparator.comparingLong(ConsumerRecord::offset)
|
||||
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
|
||||
|
||||
// partition -> sorted by offsets records
|
||||
Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
|
||||
.collect(
|
||||
groupingBy(
|
||||
ConsumerRecord::partition,
|
||||
TreeMap::new,
|
||||
collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
|
||||
|
||||
Comparator<ConsumerRecord> tsComparator = asc
|
||||
? Comparator.comparing(ConsumerRecord::timestamp)
|
||||
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
|
||||
|
||||
// merge-sorting records from partitions one by one using timestamp comparator
|
||||
return Iterables.mergeSorted(perPartition.values(), tsComparator);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -5,15 +5,15 @@ import java.util.Collection;
|
|||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.mutable.MutableLong;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
public class OffsetsInfo {
|
||||
class OffsetsInfo {
|
||||
|
||||
private final Consumer<?, ?> consumer;
|
||||
|
||||
|
@ -23,16 +23,15 @@ public class OffsetsInfo {
|
|||
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
|
||||
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
|
||||
|
||||
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
|
||||
OffsetsInfo(Consumer<?, ?> consumer, String topic) {
|
||||
this(consumer,
|
||||
consumer.partitionsFor(topic).stream()
|
||||
.map(pi -> new TopicPartition(topic, pi.partition()))
|
||||
.collect(Collectors.toList())
|
||||
.toList()
|
||||
);
|
||||
}
|
||||
|
||||
public OffsetsInfo(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> targetPartitions) {
|
||||
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
|
||||
this.consumer = consumer;
|
||||
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
|
||||
this.endOffsets = consumer.endOffsets(targetPartitions);
|
||||
|
@ -46,8 +45,8 @@ public class OffsetsInfo {
|
|||
});
|
||||
}
|
||||
|
||||
public boolean assignedPartitionsFullyPolled() {
|
||||
for (var tp: consumer.assignment()) {
|
||||
boolean assignedPartitionsFullyPolled() {
|
||||
for (var tp : consumer.assignment()) {
|
||||
Preconditions.checkArgument(endOffsets.containsKey(tp));
|
||||
if (endOffsets.get(tp) > consumer.position(tp)) {
|
||||
return false;
|
||||
|
@ -56,4 +55,10 @@ public class OffsetsInfo {
|
|||
return true;
|
||||
}
|
||||
|
||||
long summaryOffsetsRange() {
|
||||
MutableLong cnt = new MutableLong();
|
||||
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
|
||||
return cnt.getValue();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -8,13 +8,8 @@ import java.util.function.Supplier;
|
|||
public class PollingSettings {
|
||||
|
||||
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
|
||||
private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
|
||||
private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
|
||||
|
||||
private final Duration pollTimeout;
|
||||
private final Duration partitionPollTimeout;
|
||||
private final int notDataEmptyPolls; //see EmptyPollsCounter docs
|
||||
|
||||
private final Supplier<PollingThrottler> throttlerSupplier;
|
||||
|
||||
public static PollingSettings create(ClustersProperties.Cluster cluster,
|
||||
|
@ -26,18 +21,8 @@ public class PollingSettings {
|
|||
? Duration.ofMillis(pollingProps.getPollTimeoutMs())
|
||||
: DEFAULT_POLL_TIMEOUT;
|
||||
|
||||
var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
|
||||
? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
|
||||
: Duration.ofMillis(pollTimeout.toMillis() / 5);
|
||||
|
||||
int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
|
||||
? pollingProps.getNoDataEmptyPolls()
|
||||
: DEFAULT_NO_DATA_EMPTY_POLLS;
|
||||
|
||||
return new PollingSettings(
|
||||
pollTimeout,
|
||||
partitionPollTimeout,
|
||||
noDataEmptyPolls,
|
||||
PollingThrottler.throttlerSupplier(cluster)
|
||||
);
|
||||
}
|
||||
|
@ -45,34 +30,20 @@ public class PollingSettings {
|
|||
public static PollingSettings createDefault() {
|
||||
return new PollingSettings(
|
||||
DEFAULT_POLL_TIMEOUT,
|
||||
DEFAULT_PARTITION_POLL_TIMEOUT,
|
||||
DEFAULT_NO_DATA_EMPTY_POLLS,
|
||||
PollingThrottler::noop
|
||||
);
|
||||
}
|
||||
|
||||
private PollingSettings(Duration pollTimeout,
|
||||
Duration partitionPollTimeout,
|
||||
int notDataEmptyPolls,
|
||||
Supplier<PollingThrottler> throttlerSupplier) {
|
||||
this.pollTimeout = pollTimeout;
|
||||
this.partitionPollTimeout = partitionPollTimeout;
|
||||
this.notDataEmptyPolls = notDataEmptyPolls;
|
||||
this.throttlerSupplier = throttlerSupplier;
|
||||
}
|
||||
|
||||
public EmptyPollsCounter createEmptyPollsCounter() {
|
||||
return new EmptyPollsCounter(notDataEmptyPolls);
|
||||
}
|
||||
|
||||
public Duration getPollTimeout() {
|
||||
return pollTimeout;
|
||||
}
|
||||
|
||||
public Duration getPartitionPollTimeout() {
|
||||
return partitionPollTimeout;
|
||||
}
|
||||
|
||||
public PollingThrottler getPollingThrottler() {
|
||||
return throttlerSupplier.get();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.TreeMap;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
abstract class RangePollingEmitter extends AbstractEmitter {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
protected final ConsumerPosition consumerPosition;
|
||||
protected final int messagesPerPage;
|
||||
|
||||
protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
int messagesPerPage,
|
||||
MessagesProcessing messagesProcessing,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
this.consumerPosition = consumerPosition;
|
||||
this.messagesPerPage = messagesPerPage;
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
}
|
||||
|
||||
protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
|
||||
}
|
||||
|
||||
//should return empty map if polling should be stopped
|
||||
protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(
|
||||
TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
|
||||
SeekOperations seekOperations
|
||||
);
|
||||
|
||||
@Override
|
||||
public void accept(FluxSink<TopicMessageEventDTO> sink) {
|
||||
log.debug("Starting polling for {}", consumerPosition);
|
||||
try (EnhancedConsumer consumer = consumerSupplier.get()) {
|
||||
sendPhase(sink, "Consumer created");
|
||||
var seekOperations = SeekOperations.create(consumer, consumerPosition);
|
||||
TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
|
||||
log.debug("Starting from offsets {}", pollRange);
|
||||
|
||||
while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
|
||||
var polled = poll(consumer, sink, pollRange);
|
||||
send(sink, polled);
|
||||
pollRange = nextPollingRange(pollRange, seekOperations);
|
||||
}
|
||||
if (sink.isCancelled()) {
|
||||
log.debug("Polling finished due to sink cancellation");
|
||||
}
|
||||
sendFinishStatsAndCompleteSink(sink);
|
||||
log.debug("Polling finished");
|
||||
} catch (InterruptException kafkaInterruptException) {
|
||||
log.debug("Polling finished due to thread interruption");
|
||||
sink.complete();
|
||||
} catch (Exception e) {
|
||||
log.error("Error occurred while consuming records", e);
|
||||
sink.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
|
||||
FluxSink<TopicMessageEventDTO> sink,
|
||||
TreeMap<TopicPartition, FromToOffset> range) {
|
||||
log.trace("Polling range {}", range);
|
||||
sendPhase(sink,
|
||||
"Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
|
||||
|
||||
consumer.assign(range.keySet());
|
||||
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
|
||||
|
||||
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
|
||||
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
|
||||
var polledRecords = poll(sink, consumer);
|
||||
range.forEach((tp, fromTo) -> {
|
||||
polledRecords.records(tp).stream()
|
||||
.filter(r -> r.offset() < fromTo.to)
|
||||
.forEach(result::add);
|
||||
|
||||
//next position is out of target range -> pausing partition
|
||||
if (consumer.position(tp) >= fromTo.to) {
|
||||
consumer.pause(List.of(tp));
|
||||
}
|
||||
});
|
||||
}
|
||||
consumer.resume(consumer.paused());
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -10,17 +10,18 @@ import java.util.stream.Collectors;
|
|||
import javax.annotation.Nullable;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.lang3.mutable.MutableLong;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
||||
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
|
||||
class SeekOperations {
|
||||
public class SeekOperations {
|
||||
|
||||
private final Consumer<?, ?> consumer;
|
||||
private final OffsetsInfo offsetsInfo;
|
||||
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
|
||||
|
||||
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
|
||||
public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
|
||||
OffsetsInfo offsetsInfo;
|
||||
if (consumerPosition.getSeekTo() == null) {
|
||||
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
|
||||
|
@ -34,25 +35,37 @@ class SeekOperations {
|
|||
);
|
||||
}
|
||||
|
||||
void assignAndSeekNonEmptyPartitions() {
|
||||
public void assignAndSeekNonEmptyPartitions() {
|
||||
consumer.assign(offsetsForSeek.keySet());
|
||||
offsetsForSeek.forEach(consumer::seek);
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> getBeginOffsets() {
|
||||
public Map<TopicPartition, Long> getBeginOffsets() {
|
||||
return offsetsInfo.getBeginOffsets();
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> getEndOffsets() {
|
||||
public Map<TopicPartition, Long> getEndOffsets() {
|
||||
return offsetsInfo.getEndOffsets();
|
||||
}
|
||||
|
||||
boolean assignedPartitionsFullyPolled() {
|
||||
public boolean assignedPartitionsFullyPolled() {
|
||||
return offsetsInfo.assignedPartitionsFullyPolled();
|
||||
}
|
||||
|
||||
// sum of (end - start) offsets for all partitions
|
||||
public long summaryOffsetsRange() {
|
||||
return offsetsInfo.summaryOffsetsRange();
|
||||
}
|
||||
|
||||
// sum of differences between initial consumer seek and current consumer position (across all partitions)
|
||||
public long offsetsProcessedFromSeek() {
|
||||
MutableLong count = new MutableLong();
|
||||
offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
|
||||
return count.getValue();
|
||||
}
|
||||
|
||||
// Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
|
||||
Map<TopicPartition, Long> getOffsetsForSeek() {
|
||||
public Map<TopicPartition, Long> getOffsetsForSeek() {
|
||||
return offsetsForSeek;
|
||||
}
|
||||
|
||||
|
@ -61,19 +74,19 @@ class SeekOperations {
|
|||
*/
|
||||
@VisibleForTesting
|
||||
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
|
||||
OffsetsInfo offsetsInfo,
|
||||
SeekTypeDTO seekType,
|
||||
@Nullable Map<TopicPartition, Long> seekTo) {
|
||||
OffsetsInfo offsetsInfo,
|
||||
SeekTypeDTO seekType,
|
||||
@Nullable Map<TopicPartition, Long> seekTo) {
|
||||
switch (seekType) {
|
||||
case LATEST:
|
||||
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
|
||||
case BEGINNING:
|
||||
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
|
||||
case OFFSET:
|
||||
Preconditions.checkNotNull(offsetsInfo);
|
||||
Preconditions.checkNotNull(seekTo);
|
||||
return fixOffsets(offsetsInfo, seekTo);
|
||||
case TIMESTAMP:
|
||||
Preconditions.checkNotNull(offsetsInfo);
|
||||
Preconditions.checkNotNull(seekTo);
|
||||
return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
|
@ -100,7 +113,7 @@ class SeekOperations {
|
|||
}
|
||||
|
||||
private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
|
||||
Map<TopicPartition, Long> timestamps) {
|
||||
Map<TopicPartition, Long> timestamps) {
|
||||
timestamps = new HashMap<>(timestamps);
|
||||
timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());
|
||||
|
||||
|
|
|
@ -1,25 +1,28 @@
|
|||
package com.provectus.kafka.ui.emitter;
|
||||
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
|
||||
import java.util.HashMap;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
|
||||
@Slf4j
|
||||
public class TailingEmitter extends AbstractEmitter
|
||||
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
|
||||
public class TailingEmitter extends AbstractEmitter {
|
||||
|
||||
private final Supplier<EnhancedConsumer> consumerSupplier;
|
||||
private final ConsumerPosition consumerPosition;
|
||||
|
||||
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
|
||||
ConsumerPosition consumerPosition,
|
||||
MessagesProcessing messagesProcessing,
|
||||
ConsumerRecordDeserializer deserializer,
|
||||
Predicate<TopicMessageDTO> filter,
|
||||
PollingSettings pollingSettings) {
|
||||
super(messagesProcessing, pollingSettings);
|
||||
super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
|
||||
this.consumerSupplier = consumerSupplier;
|
||||
this.consumerPosition = consumerPosition;
|
||||
}
|
||||
|
@ -32,7 +35,7 @@ public class TailingEmitter extends AbstractEmitter
|
|||
while (!sink.isCancelled()) {
|
||||
sendPhase(sink, "Polling");
|
||||
var polled = poll(sink, consumer);
|
||||
polled.forEach(r -> sendMessage(sink, r));
|
||||
send(sink, polled);
|
||||
}
|
||||
sink.complete();
|
||||
log.debug("Tailing finished");
|
||||
|
|
|
@ -106,7 +106,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
|
|||
err.setFieldName(e.getKey());
|
||||
err.setRestrictions(List.copyOf(e.getValue()));
|
||||
return err;
|
||||
}).collect(Collectors.toList());
|
||||
}).toList();
|
||||
|
||||
var message = fieldsErrors.isEmpty()
|
||||
? exception.getMessage()
|
||||
|
|
|
@ -30,11 +30,12 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
|
|||
import com.provectus.kafka.ui.model.TopicConfigDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
|
||||
import com.provectus.kafka.ui.service.metrics.RawMetric;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
|
@ -54,7 +55,7 @@ public interface ClusterMapper {
|
|||
|
||||
default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
|
||||
return new ClusterMetricsDTO()
|
||||
.items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));
|
||||
.items(metrics.getSummarizedMetrics().map(this::convert).toList());
|
||||
}
|
||||
|
||||
private MetricDTO convert(RawMetric rawMetric) {
|
||||
|
@ -66,7 +67,7 @@ public interface ClusterMapper {
|
|||
|
||||
default BrokerMetricsDTO toBrokerMetrics(List<RawMetric> metrics) {
|
||||
return new BrokerMetricsDTO()
|
||||
.metrics(metrics.stream().map(this::convert).collect(Collectors.toList()));
|
||||
.metrics(metrics.stream().map(this::convert).toList());
|
||||
}
|
||||
|
||||
@Mapping(target = "isSensitive", source = "sensitive")
|
||||
|
@ -107,7 +108,7 @@ public interface ClusterMapper {
|
|||
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
|
||||
|
||||
default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {
|
||||
return map.values().stream().map(this::toPartition).collect(Collectors.toList());
|
||||
return map.values().stream().map(this::toPartition).toList();
|
||||
}
|
||||
|
||||
default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBrokerDiskUsage) {
|
||||
|
@ -118,6 +119,17 @@ public interface ClusterMapper {
|
|||
return brokerDiskUsage;
|
||||
}
|
||||
|
||||
default TopicProducerStateDTO map(int partition, ProducerState state) {
|
||||
return new TopicProducerStateDTO()
|
||||
.partition(partition)
|
||||
.producerId(state.producerId())
|
||||
.producerEpoch(state.producerEpoch())
|
||||
.lastSequence(state.lastSequence())
|
||||
.lastTimestampMs(state.lastTimestamp())
|
||||
.coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null))
|
||||
.currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null));
|
||||
}
|
||||
|
||||
static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
|
||||
return switch (operation) {
|
||||
case ALL -> KafkaAclDTO.OperationEnum.ALL;
|
||||
|
|
|
@ -21,7 +21,7 @@ public class DescribeLogDirsMapper {
|
|||
return logDirsInfo.entrySet().stream().map(
|
||||
mapEntry -> mapEntry.getValue().entrySet().stream()
|
||||
.map(e -> toBrokerLogDirs(mapEntry.getKey(), e.getKey(), e.getValue()))
|
||||
.collect(Collectors.toList())
|
||||
.toList()
|
||||
).flatMap(Collection::stream).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ public class DescribeLogDirsMapper {
|
|||
var topics = logDirInfo.replicaInfos.entrySet().stream()
|
||||
.collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
|
||||
.map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
result.setTopics(topics);
|
||||
return result;
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ public class DescribeLogDirsMapper {
|
|||
topic.setPartitions(
|
||||
partitions.stream().map(
|
||||
e -> topicPartitionLogDir(
|
||||
broker, e.getKey().partition(), e.getValue())).collect(Collectors.toList())
|
||||
broker, e.getKey().partition(), e.getValue())).toList()
|
||||
);
|
||||
return topic;
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class InternalLogDirStats {
|
|||
topicMap.getValue().replicaInfos.entrySet().stream()
|
||||
.map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
|
||||
)
|
||||
).collect(toList());
|
||||
).toList();
|
||||
|
||||
partitionsStats = topicPartitions.stream().collect(
|
||||
groupingBy(
|
||||
|
|
|
@ -52,6 +52,8 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public static final class AccessContextBuilder {
|
||||
private static final String ACTIONS_NOT_PRESENT = "actions not present";
|
||||
|
||||
private Collection<ApplicationConfigAction> applicationConfigActions = Collections.emptySet();
|
||||
private String cluster;
|
||||
private Collection<ClusterConfigAction> clusterConfigActions = Collections.emptySet();
|
||||
|
@ -75,7 +77,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder applicationConfigActions(ApplicationConfigAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.applicationConfigActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -86,7 +88,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder clusterConfigActions(ClusterConfigAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.clusterConfigActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -97,7 +99,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder topicActions(TopicAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.topicActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -108,7 +110,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder consumerGroupActions(ConsumerGroupAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.consumerGroupActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -119,7 +121,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder connectActions(ConnectAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.connectActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -135,25 +137,25 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder schemaActions(SchemaAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.schemaActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder ksqlActions(KsqlAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.ksqlActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder aclActions(AclAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.aclActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder auditActions(AuditAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.auditActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import javax.annotation.Nullable;
|
|||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.ToString;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
@Getter
|
||||
|
|
|
@ -1,15 +1,25 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public enum AclAction implements PermissibleAction {
|
||||
|
||||
VIEW,
|
||||
EDIT;
|
||||
EDIT
|
||||
|
||||
;
|
||||
|
||||
public static final Set<AclAction> ALTER_ACTIONS = Set.of(EDIT);
|
||||
|
||||
@Nullable
|
||||
public static AclAction fromString(String name) {
|
||||
return EnumUtils.getEnum(AclAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -10,9 +11,15 @@ public enum ApplicationConfigAction implements PermissibleAction {
|
|||
|
||||
;
|
||||
|
||||
public static final Set<ApplicationConfigAction> ALTER_ACTIONS = Set.of(EDIT);
|
||||
|
||||
@Nullable
|
||||
public static ApplicationConfigAction fromString(String name) {
|
||||
return EnumUtils.getEnum(ApplicationConfigAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,24 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public enum AuditAction implements PermissibleAction {
|
||||
|
||||
VIEW;
|
||||
VIEW
|
||||
|
||||
;
|
||||
|
||||
private static final Set<AuditAction> ALTER_ACTIONS = Set.of();
|
||||
|
||||
@Nullable
|
||||
public static AuditAction fromString(String name) {
|
||||
return EnumUtils.getEnum(AuditAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -10,9 +11,15 @@ public enum ClusterConfigAction implements PermissibleAction {
|
|||
|
||||
;
|
||||
|
||||
public static final Set<ClusterConfigAction> ALTER_ACTIONS = Set.of(EDIT);
|
||||
|
||||
@Nullable
|
||||
public static ClusterConfigAction fromString(String name) {
|
||||
return EnumUtils.getEnum(ClusterConfigAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -12,9 +13,15 @@ public enum ConnectAction implements PermissibleAction {
|
|||
|
||||
;
|
||||
|
||||
public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, RESTART);
|
||||
|
||||
@Nullable
|
||||
public static ConnectAction fromString(String name) {
|
||||
return EnumUtils.getEnum(ConnectAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -7,14 +8,19 @@ public enum ConsumerGroupAction implements PermissibleAction {
|
|||
|
||||
VIEW,
|
||||
DELETE,
|
||||
|
||||
RESET_OFFSETS
|
||||
|
||||
;
|
||||
|
||||
public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);
|
||||
|
||||
@Nullable
|
||||
public static ConsumerGroupAction fromString(String name) {
|
||||
return EnumUtils.getEnum(ConsumerGroupAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,24 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
public enum KsqlAction implements PermissibleAction {
|
||||
|
||||
EXECUTE;
|
||||
EXECUTE
|
||||
|
||||
;
|
||||
|
||||
public static final Set<KsqlAction> ALTER_ACTIONS = Set.of(EXECUTE);
|
||||
|
||||
@Nullable
|
||||
public static KsqlAction fromString(String name) {
|
||||
return EnumUtils.getEnum(KsqlAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,4 +5,9 @@ public sealed interface PermissibleAction permits
|
|||
ConsumerGroupAction, SchemaAction,
|
||||
ConnectAction, ClusterConfigAction,
|
||||
KsqlAction, TopicAction, AuditAction {
|
||||
|
||||
String name();
|
||||
|
||||
boolean isAlter();
|
||||
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -13,9 +14,15 @@ public enum SchemaAction implements PermissibleAction {
|
|||
|
||||
;
|
||||
|
||||
public static final Set<SchemaAction> ALTER_ACTIONS = Set.of(CREATE, DELETE, EDIT, MODIFY_GLOBAL_COMPATIBILITY);
|
||||
|
||||
@Nullable
|
||||
public static SchemaAction fromString(String name) {
|
||||
return EnumUtils.getEnum(SchemaAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
|
@ -9,16 +10,21 @@ public enum TopicAction implements PermissibleAction {
|
|||
CREATE,
|
||||
EDIT,
|
||||
DELETE,
|
||||
|
||||
MESSAGES_READ,
|
||||
MESSAGES_PRODUCE,
|
||||
MESSAGES_DELETE,
|
||||
|
||||
;
|
||||
|
||||
public static final Set<TopicAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, MESSAGES_PRODUCE, MESSAGES_DELETE);
|
||||
|
||||
@Nullable
|
||||
public static TopicAction fromString(String name) {
|
||||
return EnumUtils.getEnum(TopicAction.class, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.provectus.kafka.ui.serdes;
|
||||
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO.TimestampTypeEnum;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
|
@ -8,6 +9,7 @@ import java.time.ZoneId;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.UnaryOperator;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
|
@ -32,6 +34,8 @@ public class ConsumerRecordDeserializer {
|
|||
private final Serde.Deserializer fallbackKeyDeserializer;
|
||||
private final Serde.Deserializer fallbackValueDeserializer;
|
||||
|
||||
private final UnaryOperator<TopicMessageDTO> masker;
|
||||
|
||||
public TopicMessageDTO deserialize(ConsumerRecord<Bytes, Bytes> rec) {
|
||||
var message = new TopicMessageDTO();
|
||||
fillKey(message, rec);
|
||||
|
@ -47,20 +51,15 @@ public class ConsumerRecordDeserializer {
|
|||
message.setValueSize(getValueSize(rec));
|
||||
message.setHeadersSize(getHeadersSize(rec));
|
||||
|
||||
return message;
|
||||
return masker.apply(message);
|
||||
}
|
||||
|
||||
private static TopicMessageDTO.TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
||||
switch (timestampType) {
|
||||
case CREATE_TIME:
|
||||
return TopicMessageDTO.TimestampTypeEnum.CREATE_TIME;
|
||||
case LOG_APPEND_TIME:
|
||||
return TopicMessageDTO.TimestampTypeEnum.LOG_APPEND_TIME;
|
||||
case NO_TIMESTAMP_TYPE:
|
||||
return TopicMessageDTO.TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown timestampType: " + timestampType);
|
||||
}
|
||||
private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType) {
|
||||
return switch (timestampType) {
|
||||
case CREATE_TIME -> TimestampTypeEnum.CREATE_TIME;
|
||||
case LOG_APPEND_TIME -> TimestampTypeEnum.LOG_APPEND_TIME;
|
||||
case NO_TIMESTAMP_TYPE -> TimestampTypeEnum.NO_TIMESTAMP_TYPE;
|
||||
};
|
||||
}
|
||||
|
||||
private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
|
||||
|
|
|
@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.HexSerde;
|
|||
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
|
||||
import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
|
||||
|
@ -50,6 +51,7 @@ public class SerdesInitializer {
|
|||
.put(Base64Serde.name(), Base64Serde.class)
|
||||
.put(HexSerde.name(), HexSerde.class)
|
||||
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
|
||||
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
|
||||
.build(),
|
||||
new CustomSerdeLoader()
|
||||
);
|
||||
|
|
|
@ -6,7 +6,6 @@ import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
|||
import java.util.Base64;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
public class Base64Serde implements BuiltInSerde {
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package com.provectus.kafka.ui.serdes.builtin;
|
|||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
|
@ -27,6 +28,23 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
|
||||
private static final JsonMapper JSON_MAPPER = createMapper();
|
||||
|
||||
private static final String ASSIGNMENT = "assignment";
|
||||
private static final String CLIENT_HOST = "client_host";
|
||||
private static final String CLIENT_ID = "client_id";
|
||||
private static final String COMMIT_TIMESTAMP = "commit_timestamp";
|
||||
private static final String CURRENT_STATE_TIMESTAMP = "current_state_timestamp";
|
||||
private static final String GENERATION = "generation";
|
||||
private static final String LEADER = "leader";
|
||||
private static final String MEMBERS = "members";
|
||||
private static final String MEMBER_ID = "member_id";
|
||||
private static final String METADATA = "metadata";
|
||||
private static final String OFFSET = "offset";
|
||||
private static final String PROTOCOL = "protocol";
|
||||
private static final String PROTOCOL_TYPE = "protocol_type";
|
||||
private static final String REBALANCE_TIMEOUT = "rebalance_timeout";
|
||||
private static final String SESSION_TIMEOUT = "session_timeout";
|
||||
private static final String SUBSCRIPTION = "subscription";
|
||||
|
||||
public static final String TOPIC = "__consumer_offsets";
|
||||
|
||||
public static String name() {
|
||||
|
@ -115,128 +133,128 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
private Deserializer valueDeserializer() {
|
||||
final Schema commitOffsetSchemaV0 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV1 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
||||
new Field("expire_timestamp", Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV2 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV3 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field("leader_epoch", Type.INT32, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV4 = new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field("leader_epoch", Type.INT32, ""),
|
||||
new Field("metadata", Type.COMPACT_STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, ""),
|
||||
new Field(METADATA, Type.COMPACT_STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
);
|
||||
|
||||
final Schema metadataSchema0 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema1 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema2 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema3 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field("group_instance_id", Type.NULLABLE_STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema4 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.COMPACT_STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new CompactArrayOf(new Schema(
|
||||
new Field("member_id", Type.COMPACT_STRING, ""),
|
||||
new Field(PROTOCOL_TYPE, Type.COMPACT_STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new CompactArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.COMPACT_STRING, ""),
|
||||
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("client_id", Type.COMPACT_STRING, ""),
|
||||
new Field("client_host", Type.COMPACT_STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.COMPACT_BYTES, ""),
|
||||
new Field("assignment", Type.COMPACT_BYTES, ""),
|
||||
new Field(CLIENT_ID, Type.COMPACT_STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.COMPACT_STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.COMPACT_BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.COMPACT_BYTES, ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
)), ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
|
@ -248,7 +266,7 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
short version = bb.getShort();
|
||||
// ideally, we should distinguish if value is commit or metadata
|
||||
// by checking record's key, but our current serde structure doesn't allow that.
|
||||
// so, we trying to parse into metadata first and after into commit msg
|
||||
// so, we are trying to parse into metadata first and after into commit msg
|
||||
try {
|
||||
result = toJson(
|
||||
switch (version) {
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.provectus.kafka.ui.serdes.builtin;
|
|||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.PropertyResolver;
|
||||
import com.provectus.kafka.ui.serde.api.SchemaDescription;
|
||||
import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
||||
import java.util.Map;
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin;
|
||||
|
||||
import com.google.protobuf.UnknownFieldSet;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.RecordHeaders;
|
||||
import com.provectus.kafka.ui.serde.api.SchemaDescription;
|
||||
import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
public class ProtobufRawSerde implements BuiltInSerde {
|
||||
|
||||
public static String name() {
|
||||
return "ProtobufDecodeRaw";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> getDescription() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SchemaDescription> getSchema(String topic, Target type) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canSerialize(String topic, Target type) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canDeserialize(String topic, Target type) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serializer serializer(String topic, Target type) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Deserializer deserializer(String topic, Target type) {
|
||||
return new Deserializer() {
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
|
||||
try {
|
||||
UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
|
||||
return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
|
||||
} catch (Exception e) {
|
||||
throw new ValidationException(e.getMessage());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializer;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
|
||||
|
||||
AvroSchemaRegistrySerializer(String topic, boolean isKey,
|
||||
SchemaRegistryClient client,
|
||||
SchemaMetadata schema) {
|
||||
super(topic, isKey, client, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Serializer<Object> createSerializer(SchemaRegistryClient client) {
|
||||
var serializer = new KafkaAvroSerializer(client);
|
||||
serializer.configure(
|
||||
Map.of(
|
||||
"schema.registry.url", "wontbeused",
|
||||
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
|
||||
KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true,
|
||||
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
|
||||
),
|
||||
isKey
|
||||
);
|
||||
return serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object serialize(String value, ParsedSchema schema) {
|
||||
try {
|
||||
return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema());
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Failed to serialize record for topic " + topic, e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,79 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
class JsonSchemaSchemaRegistrySerializer extends SchemaRegistrySerializer<JsonNode> {
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
JsonSchemaSchemaRegistrySerializer(String topic,
|
||||
boolean isKey,
|
||||
SchemaRegistryClient client,
|
||||
SchemaMetadata schema) {
|
||||
super(topic, isKey, client, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Serializer<JsonNode> createSerializer(SchemaRegistryClient client) {
|
||||
var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client);
|
||||
serializer.configure(
|
||||
Map.of(
|
||||
"schema.registry.url", "wontbeused",
|
||||
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
|
||||
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
|
||||
),
|
||||
isKey
|
||||
);
|
||||
return serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonNode serialize(String value, ParsedSchema schema) {
|
||||
try {
|
||||
JsonNode json = MAPPER.readTree(value);
|
||||
((JsonSchema) schema).validate(json);
|
||||
return json;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new ValidationException(String.format("'%s' is not valid json", value));
|
||||
} catch (org.everit.json.schema.ValidationException e) {
|
||||
throw new ValidationException(
|
||||
String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant
|
||||
private class KafkaJsonSchemaSerializerWithoutSchemaInfer
|
||||
extends KafkaJsonSchemaSerializer<JsonNode> {
|
||||
|
||||
KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) {
|
||||
super(client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Need to override original method because it tries to infer schema from input
|
||||
* by checking 'schema' json field or @Schema annotation on input class, which is not
|
||||
* possible in our case. So, we just skip all infer logic and pass schema directly.
|
||||
*/
|
||||
@Override
|
||||
public byte[] serialize(String topic, JsonNode rec) {
|
||||
return super.serializeImpl(
|
||||
super.getSubjectName(topic, isKey, rec, schema),
|
||||
rec,
|
||||
(JsonSchema) schema
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
|
||||
import java.util.Map;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
class ProtobufSchemaRegistrySerializer extends SchemaRegistrySerializer<Message> {
|
||||
|
||||
@SneakyThrows
|
||||
public ProtobufSchemaRegistrySerializer(String topic, boolean isKey,
|
||||
SchemaRegistryClient client, SchemaMetadata schema) {
|
||||
super(topic, isKey, client, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Serializer<Message> createSerializer(SchemaRegistryClient client) {
|
||||
var serializer = new KafkaProtobufSerializer<>(client);
|
||||
serializer.configure(
|
||||
Map.of(
|
||||
"schema.registry.url", "wontbeused",
|
||||
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
|
||||
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
|
||||
),
|
||||
isKey
|
||||
);
|
||||
return serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message serialize(String value, ParsedSchema schema) {
|
||||
ProtobufSchema protobufSchema = (ProtobufSchema) schema;
|
||||
DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
|
||||
try {
|
||||
JsonFormat.parser().merge(value, builder);
|
||||
return builder.build();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Failed to serialize record for topic " + topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,5 +1,8 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro;
|
||||
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson;
|
||||
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto;
|
||||
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
|
||||
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
|
||||
|
||||
|
@ -7,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.PropertyResolver;
|
||||
import com.provectus.kafka.ui.serde.api.RecordHeaders;
|
||||
import com.provectus.kafka.ui.serde.api.SchemaDescription;
|
||||
import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
||||
import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
|
||||
|
@ -32,17 +34,21 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
|
||||
|
||||
public class SchemaRegistrySerde implements BuiltInSerde {
|
||||
|
||||
private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
|
||||
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
|
||||
|
||||
public static String name() {
|
||||
return "SchemaRegistry";
|
||||
}
|
||||
|
||||
private static final String SCHEMA_REGISTRY = "schemaRegistry";
|
||||
|
||||
private SchemaRegistryClient schemaRegistryClient;
|
||||
private List<String> schemaRegistryUrls;
|
||||
private String valueSchemaNameTemplate;
|
||||
|
@ -54,7 +60,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
return kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
|
||||
return kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class)
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.isPresent();
|
||||
}
|
||||
|
@ -62,7 +68,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public void autoConfigure(PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
var urls = kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
|
||||
var urls = kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class)
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
|
||||
configure(
|
||||
|
@ -88,7 +94,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
var urls = serdeProperties.getListProperty("url", String.class)
|
||||
.or(() -> kafkaClusterProperties.getListProperty("schemaRegistry", String.class))
|
||||
.or(() -> kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class))
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
|
||||
configure(
|
||||
|
@ -219,8 +225,8 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
|
||||
.toJson();
|
||||
case JSON ->
|
||||
//need to use confluent JsonSchema since it includes resolved references
|
||||
((JsonSchema) parsedSchema).rawSchema().toString();
|
||||
//need to use confluent JsonSchema since it includes resolved references
|
||||
((JsonSchema) parsedSchema).rawSchema().toString();
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -252,35 +258,27 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public Serializer serializer(String topic, Target type) {
|
||||
String subject = schemaSubject(topic, type);
|
||||
var schema = getSchemaBySubject(subject)
|
||||
.orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject)));
|
||||
boolean isKey = type == Target.KEY;
|
||||
SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
|
||||
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
|
||||
SchemaMetadata meta = getSchemaBySubject(subject)
|
||||
.orElseThrow(() -> new ValidationException(
|
||||
String.format("No schema for subject '%s' found", subject)));
|
||||
ParsedSchema schema = getSchemaById(meta.getId())
|
||||
.orElseThrow(() -> new IllegalStateException(
|
||||
String.format("Schema found for id %s, subject '%s'", meta.getId(), subject)));
|
||||
SchemaType schemaType = SchemaType.fromString(meta.getSchemaType())
|
||||
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType()));
|
||||
return switch (schemaType) {
|
||||
case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
|
||||
case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
|
||||
case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
|
||||
case PROTOBUF -> input ->
|
||||
serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input);
|
||||
case AVRO -> input ->
|
||||
serializeAvro((AvroSchema) schema, meta.getId(), input);
|
||||
case JSON -> input ->
|
||||
serializeJson((JsonSchema) schema, meta.getId(), input);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Deserializer deserializer(String topic, Target type) {
|
||||
return new SrDeserializer(topic);
|
||||
}
|
||||
|
||||
///--------------------------------------------------------------
|
||||
|
||||
private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
|
||||
private static final int SR_RECORD_PREFIX_LENGTH = 5;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private class SrDeserializer implements Deserializer {
|
||||
|
||||
private final String topic;
|
||||
|
||||
@Override
|
||||
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
|
||||
return (headers, data) -> {
|
||||
var schemaId = extractSchemaIdFromMsg(data);
|
||||
SchemaType format = getMessageFormatBySchemaId(schemaId);
|
||||
MessageFormatter formatter = schemaRegistryFormatters.get(format);
|
||||
|
@ -292,7 +290,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
"type", format.name()
|
||||
)
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SchemaType getMessageFormatBySchemaId(int schemaId) {
|
||||
|
@ -304,7 +302,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
|
||||
private int extractSchemaIdFromMsg(byte[] data) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||
if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
|
||||
if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
|
||||
return buffer.getInt();
|
||||
}
|
||||
throw new ValidationException(
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
abstract class SchemaRegistrySerializer<T> implements Serde.Serializer {
|
||||
protected final Serializer<T> serializer;
|
||||
protected final String topic;
|
||||
protected final boolean isKey;
|
||||
protected final ParsedSchema schema;
|
||||
|
||||
@SneakyThrows
|
||||
protected SchemaRegistrySerializer(String topic, boolean isKey, SchemaRegistryClient client,
|
||||
SchemaMetadata schema) {
|
||||
this.topic = topic;
|
||||
this.isKey = isKey;
|
||||
this.serializer = createSerializer(client);
|
||||
this.schema = client.getSchemaById(schema.getId());
|
||||
}
|
||||
|
||||
protected abstract Serializer<T> createSerializer(SchemaRegistryClient client);
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String input) {
|
||||
final T read = this.serialize(input, schema);
|
||||
return this.serializer.serialize(topic, read);
|
||||
}
|
||||
|
||||
protected abstract T serialize(String value, ParsedSchema schema);
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
||||
import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||
import io.confluent.kafka.schemaregistry.json.jackson.Jackson;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||
import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
|
||||
import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.io.BinaryEncoder;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
|
||||
final class Serialize {
|
||||
|
||||
private static final byte MAGIC = 0x0;
|
||||
private static final ObjectMapper JSON_SERIALIZE_MAPPER = Jackson.newObjectMapper(); //from confluent package
|
||||
|
||||
private Serialize() {
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant("AbstractKafkaJsonSchemaSerializer::serializeImpl")
|
||||
@SneakyThrows
|
||||
static byte[] serializeJson(JsonSchema schema, int schemaId, String value) {
|
||||
JsonNode json;
|
||||
try {
|
||||
json = JSON_SERIALIZE_MAPPER.readTree(value);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new ValidationException(String.format("'%s' is not valid json", value));
|
||||
}
|
||||
try {
|
||||
schema.validate(json);
|
||||
} catch (org.everit.json.schema.ValidationException e) {
|
||||
throw new ValidationException(
|
||||
String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
|
||||
}
|
||||
try (var out = new ByteArrayOutputStream()) {
|
||||
out.write(MAGIC);
|
||||
out.write(schemaId(schemaId));
|
||||
out.write(JSON_SERIALIZE_MAPPER.writeValueAsBytes(json));
|
||||
return out.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant("AbstractKafkaProtobufSerializer::serializeImpl")
|
||||
@SneakyThrows
|
||||
static byte[] serializeProto(SchemaRegistryClient srClient,
|
||||
String topic,
|
||||
Serde.Target target,
|
||||
ProtobufSchema schema,
|
||||
int schemaId,
|
||||
String input) {
|
||||
// flags are tuned like in ProtobufSerializer by default
|
||||
boolean normalizeSchema = false;
|
||||
boolean autoRegisterSchema = false;
|
||||
boolean useLatestVersion = true;
|
||||
boolean latestCompatStrict = true;
|
||||
boolean skipKnownTypes = true;
|
||||
|
||||
schema = AbstractKafkaProtobufSerializer.resolveDependencies(
|
||||
srClient, normalizeSchema, autoRegisterSchema, useLatestVersion, latestCompatStrict,
|
||||
new HashMap<>(), skipKnownTypes, new DefaultReferenceSubjectNameStrategy(),
|
||||
topic, target == Serde.Target.KEY, schema
|
||||
);
|
||||
|
||||
DynamicMessage.Builder builder = schema.newMessageBuilder();
|
||||
JsonFormat.parser().merge(input, builder);
|
||||
Message message = builder.build();
|
||||
MessageIndexes indexes = schema.toMessageIndexes(message.getDescriptorForType().getFullName(), normalizeSchema);
|
||||
try (var out = new ByteArrayOutputStream()) {
|
||||
out.write(MAGIC);
|
||||
out.write(schemaId(schemaId));
|
||||
out.write(indexes.toByteArray());
|
||||
message.writeTo(out);
|
||||
return out.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl")
|
||||
@SneakyThrows
|
||||
static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) {
|
||||
var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema());
|
||||
try (var out = new ByteArrayOutputStream()) {
|
||||
out.write(MAGIC);
|
||||
out.write(schemaId(schemaId));
|
||||
Schema rawSchema = schema.rawSchema();
|
||||
if (rawSchema.getType().equals(Schema.Type.BYTES)) {
|
||||
Preconditions.checkState(
|
||||
avroObject instanceof ByteBuffer,
|
||||
"Unrecognized bytes object of type: " + avroObject.getClass().getName()
|
||||
);
|
||||
out.write(((ByteBuffer) avroObject).array());
|
||||
} else {
|
||||
boolean useLogicalTypeConverters = true;
|
||||
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
|
||||
DatumWriter<Object> writer =
|
||||
(DatumWriter<Object>) AvroSchemaUtils.getDatumWriter(avroObject, rawSchema, useLogicalTypeConverters);
|
||||
writer.write(avroObject, encoder);
|
||||
encoder.flush();
|
||||
}
|
||||
return out.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] schemaId(int id) {
|
||||
return ByteBuffer.allocate(Integer.BYTES).putInt(id).array();
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service;
|
|||
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import java.io.Closeable;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
|
@ -42,7 +42,7 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {
|
|||
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
|
||||
return Mono.fromSupplier(() -> {
|
||||
Properties properties = new Properties();
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
properties.putAll(cluster.getProperties());
|
||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
|
||||
|
|
|
@ -10,7 +10,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
|
|||
import com.provectus.kafka.ui.model.SortOrderDTO;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import com.provectus.kafka.ui.util.ApplicationMetrics;
|
||||
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
|
@ -254,7 +254,7 @@ public class ConsumerGroupService {
|
|||
public EnhancedConsumer createConsumer(KafkaCluster cluster,
|
||||
Map<String, Object> properties) {
|
||||
Properties props = new Properties();
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
|
||||
props.putAll(cluster.getProperties());
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
|
|
|
@ -102,7 +102,8 @@ public class DeserializationService implements Closeable {
|
|||
valueSerde.deserializer(topic, Serde.Target.VALUE),
|
||||
fallbackSerde.getName(),
|
||||
fallbackSerde.deserializer(topic, Serde.Target.KEY),
|
||||
fallbackSerde.deserializer(topic, Serde.Target.VALUE)
|
||||
fallbackSerde.deserializer(topic, Serde.Target.VALUE),
|
||||
cluster.getMasking().getMaskerForTopic(topic)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -2,10 +2,9 @@ package com.provectus.kafka.ui.service;
|
|||
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
|
||||
import com.provectus.kafka.ui.emitter.BackwardEmitter;
|
||||
import com.provectus.kafka.ui.emitter.ForwardEmitter;
|
||||
import com.provectus.kafka.ui.emitter.MessageFilters;
|
||||
import com.provectus.kafka.ui.emitter.MessagesProcessing;
|
||||
import com.provectus.kafka.ui.emitter.TailingEmitter;
|
||||
import com.provectus.kafka.ui.exception.TopicNotFoundException;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
|
@ -18,9 +17,8 @@ import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
|
|||
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.serdes.ProducerRecordCreator;
|
||||
import com.provectus.kafka.ui.util.KafkaClientSslPropertiesUtil;
|
||||
import com.provectus.kafka.ui.util.SslPropertiesUtil;
|
||||
import java.time.Instant;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.time.ZoneOffset;
|
||||
|
@ -45,7 +43,6 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
|
@ -191,7 +188,7 @@ public class MessagesService {
|
|||
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
|
||||
Map<String, Object> additionalProps) {
|
||||
Properties properties = new Properties();
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
|
||||
properties.putAll(cluster.getProperties());
|
||||
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
|
||||
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
|
@ -231,54 +228,24 @@ public class MessagesService {
|
|||
@Nullable String keySerde,
|
||||
@Nullable String valueSerde) {
|
||||
|
||||
java.util.function.Consumer<? super FluxSink<TopicMessageEventDTO>> emitter;
|
||||
|
||||
var processing = new MessagesProcessing(
|
||||
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
|
||||
getMsgFilter(query, filterQueryType),
|
||||
seekDirection == SeekDirectionDTO.TAILING ? null : limit
|
||||
);
|
||||
|
||||
if (seekDirection.equals(SeekDirectionDTO.FORWARD)) {
|
||||
emitter = new ForwardRecordEmitter(
|
||||
var deserializer = deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde);
|
||||
var filter = getMsgFilter(query, filterQueryType);
|
||||
var emitter = switch (seekDirection) {
|
||||
case FORWARD -> new ForwardEmitter(
|
||||
() -> consumerGroupService.createConsumer(cluster),
|
||||
consumerPosition,
|
||||
processing,
|
||||
cluster.getPollingSettings()
|
||||
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
|
||||
);
|
||||
} else if (seekDirection.equals(SeekDirectionDTO.BACKWARD)) {
|
||||
emitter = new BackwardRecordEmitter(
|
||||
case BACKWARD -> new BackwardEmitter(
|
||||
() -> consumerGroupService.createConsumer(cluster),
|
||||
consumerPosition,
|
||||
limit,
|
||||
processing,
|
||||
cluster.getPollingSettings()
|
||||
consumerPosition, limit, deserializer, filter, cluster.getPollingSettings()
|
||||
);
|
||||
} else {
|
||||
emitter = new TailingEmitter(
|
||||
case TAILING -> new TailingEmitter(
|
||||
() -> consumerGroupService.createConsumer(cluster),
|
||||
consumerPosition,
|
||||
processing,
|
||||
cluster.getPollingSettings()
|
||||
consumerPosition, deserializer, filter, cluster.getPollingSettings()
|
||||
);
|
||||
}
|
||||
return Flux.create(emitter)
|
||||
.map(getDataMasker(cluster, topic))
|
||||
.map(throttleUiPublish(seekDirection));
|
||||
}
|
||||
|
||||
private UnaryOperator<TopicMessageEventDTO> getDataMasker(KafkaCluster cluster, String topicName) {
|
||||
var keyMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.KEY);
|
||||
var valMasker = cluster.getMasking().getMaskingFunction(topicName, Serde.Target.VALUE);
|
||||
return evt -> {
|
||||
if (evt.getType() != TopicMessageEventDTO.TypeEnum.MESSAGE) {
|
||||
return evt;
|
||||
}
|
||||
return evt.message(
|
||||
evt.getMessage()
|
||||
.key(keyMasker.apply(evt.getMessage().getKey()))
|
||||
.content(valMasker.apply(evt.getMessage().getContent())));
|
||||
};
|
||||
return Flux.create(emitter)
|
||||
.map(throttleUiPublish(seekDirection));
|
||||
}
|
||||
|
||||
private Predicate<TopicMessageDTO> getMsgFilter(String query,
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.function.BiFunction;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.AccessLevel;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
|||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.clients.admin.RecordsToDelete;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
|
@ -658,6 +660,21 @@ public class ReactiveAdminClient implements Closeable {
|
|||
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
|
||||
}
|
||||
|
||||
// returns tp -> list of active producer's states (if any)
|
||||
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
|
||||
return describeTopic(topic)
|
||||
.map(td -> client.describeProducers(
|
||||
IntStream.range(0, td.partitions().size())
|
||||
.mapToObj(i -> new TopicPartition(topic, i))
|
||||
.toList()
|
||||
).all()
|
||||
)
|
||||
.flatMap(ReactiveAdminClient::toMono)
|
||||
.map(map -> map.entrySet().stream()
|
||||
.filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
|
||||
.collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
|
||||
}
|
||||
|
||||
private Mono<Void> incrementalAlterConfig(String topicName,
|
||||
List<ConfigEntry> currentConfigs,
|
||||
Map<String, String> newConfigs) {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
|
|||
import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
||||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
@ -459,6 +460,11 @@ public class TopicsService {
|
|||
);
|
||||
}
|
||||
|
||||
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.getActiveProducersState(topic));
|
||||
}
|
||||
|
||||
private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.listTopics(true))
|
||||
|
|
|
@ -92,14 +92,12 @@ class AnalysisTasksStore {
|
|||
.result(completedState);
|
||||
}
|
||||
|
||||
@Value
|
||||
@Builder(toBuilder = true)
|
||||
private static class RunningAnalysis {
|
||||
Instant startedAt;
|
||||
double completenessPercent;
|
||||
long msgsScanned;
|
||||
long bytesScanned;
|
||||
Closeable task;
|
||||
private record RunningAnalysis(Instant startedAt,
|
||||
double completenessPercent,
|
||||
long msgsScanned,
|
||||
long bytesScanned,
|
||||
Closeable task) {
|
||||
|
||||
TopicAnalysisProgressDTO toDto() {
|
||||
return new TopicAnalysisProgressDTO()
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package com.provectus.kafka.ui.service.analyze;
|
||||
|
||||
import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
|
||||
import static com.provectus.kafka.ui.model.SeekTypeDTO.BEGINNING;
|
||||
|
||||
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
|
||||
import com.provectus.kafka.ui.emitter.OffsetsInfo;
|
||||
import com.provectus.kafka.ui.emitter.PollingSettings;
|
||||
import com.provectus.kafka.ui.emitter.SeekOperations;
|
||||
import com.provectus.kafka.ui.exception.TopicAnalysisException;
|
||||
import com.provectus.kafka.ui.model.ConsumerPosition;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
|
||||
import com.provectus.kafka.ui.service.ConsumerGroupService;
|
||||
|
@ -15,16 +16,14 @@ import java.time.Instant;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.InterruptException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
|
||||
|
@ -33,6 +32,14 @@ import reactor.core.scheduler.Schedulers;
|
|||
@RequiredArgsConstructor
|
||||
public class TopicAnalysisService {
|
||||
|
||||
private static final Scheduler SCHEDULER = Schedulers.newBoundedElastic(
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
|
||||
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
|
||||
"topic-analysis-tasks",
|
||||
10, //ttl for idle threads (in sec)
|
||||
true //daemon
|
||||
);
|
||||
|
||||
private final AnalysisTasksStore analysisTasksStore = new AnalysisTasksStore();
|
||||
|
||||
private final TopicsService topicsService;
|
||||
|
@ -40,30 +47,18 @@ public class TopicAnalysisService {
|
|||
|
||||
public Mono<Void> analyze(KafkaCluster cluster, String topicName) {
|
||||
return topicsService.getTopicDetails(cluster, topicName)
|
||||
.doOnNext(topic ->
|
||||
startAnalysis(
|
||||
cluster,
|
||||
topicName,
|
||||
topic.getPartitionCount(),
|
||||
topic.getPartitions().values()
|
||||
.stream()
|
||||
.mapToLong(p -> p.getOffsetMax() - p.getOffsetMin())
|
||||
.sum()
|
||||
)
|
||||
).then();
|
||||
.doOnNext(topic -> startAnalysis(cluster, topicName))
|
||||
.then();
|
||||
}
|
||||
|
||||
private synchronized void startAnalysis(KafkaCluster cluster,
|
||||
String topic,
|
||||
int partitionsCnt,
|
||||
long approxNumberOfMsgs) {
|
||||
private synchronized void startAnalysis(KafkaCluster cluster, String topic) {
|
||||
var topicId = new TopicIdentity(cluster, topic);
|
||||
if (analysisTasksStore.isAnalysisInProgress(topicId)) {
|
||||
throw new TopicAnalysisException("Topic is already analyzing");
|
||||
}
|
||||
var task = new AnalysisTask(cluster, topicId, partitionsCnt, approxNumberOfMsgs, cluster.getPollingSettings());
|
||||
var task = new AnalysisTask(cluster, topicId);
|
||||
analysisTasksStore.registerNewTask(topicId, task);
|
||||
Schedulers.boundedElastic().schedule(task);
|
||||
SCHEDULER.schedule(task);
|
||||
}
|
||||
|
||||
public void cancelAnalysis(KafkaCluster cluster, String topicName) {
|
||||
|
@ -79,20 +74,14 @@ public class TopicAnalysisService {
|
|||
private final Instant startedAt = Instant.now();
|
||||
|
||||
private final TopicIdentity topicId;
|
||||
private final int partitionsCnt;
|
||||
private final long approxNumberOfMsgs;
|
||||
private final EmptyPollsCounter emptyPollsCounter;
|
||||
|
||||
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
|
||||
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
|
||||
|
||||
private final EnhancedConsumer consumer;
|
||||
|
||||
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
|
||||
long approxNumberOfMsgs, PollingSettings pollingSettings) {
|
||||
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId) {
|
||||
this.topicId = topicId;
|
||||
this.approxNumberOfMsgs = approxNumberOfMsgs;
|
||||
this.partitionsCnt = partitionsCnt;
|
||||
this.consumer = consumerGroupService.createConsumer(
|
||||
cluster,
|
||||
// to improve polling throughput
|
||||
|
@ -101,7 +90,6 @@ public class TopicAnalysisService {
|
|||
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
|
||||
)
|
||||
);
|
||||
this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -113,23 +101,20 @@ public class TopicAnalysisService {
|
|||
public void run() {
|
||||
try {
|
||||
log.info("Starting {} topic analysis", topicId);
|
||||
var topicPartitions = IntStream.range(0, partitionsCnt)
|
||||
.peek(i -> partitionStats.put(i, new TopicAnalysisStats()))
|
||||
.mapToObj(i -> new TopicPartition(topicId.topicName, i))
|
||||
.collect(Collectors.toList());
|
||||
consumer.partitionsFor(topicId.topicName)
|
||||
.forEach(tp -> partitionStats.put(tp.partition(), new TopicAnalysisStats()));
|
||||
|
||||
consumer.assign(topicPartitions);
|
||||
consumer.seekToBeginning(topicPartitions);
|
||||
var seekOperations = SeekOperations.create(consumer, new ConsumerPosition(BEGINNING, topicId.topicName, null));
|
||||
long summaryOffsetsRange = seekOperations.summaryOffsetsRange();
|
||||
seekOperations.assignAndSeekNonEmptyPartitions();
|
||||
|
||||
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
|
||||
while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
|
||||
while (!seekOperations.assignedPartitionsFullyPolled()) {
|
||||
var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
|
||||
emptyPollsCounter.count(polled.count());
|
||||
polled.forEach(r -> {
|
||||
totalStats.apply(r);
|
||||
partitionStats.get(r.partition()).apply(r);
|
||||
});
|
||||
updateProgress();
|
||||
updateProgress(seekOperations.offsetsProcessedFromSeek(), summaryOffsetsRange);
|
||||
}
|
||||
analysisTasksStore.setAnalysisResult(topicId, startedAt, totalStats, partitionStats);
|
||||
log.info("{} topic analysis finished", topicId);
|
||||
|
@ -145,13 +130,13 @@ public class TopicAnalysisService {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateProgress() {
|
||||
if (totalStats.totalMsgs > 0 && approxNumberOfMsgs != 0) {
|
||||
private void updateProgress(long processedOffsets, long summaryOffsetsRange) {
|
||||
if (processedOffsets > 0 && summaryOffsetsRange != 0) {
|
||||
analysisTasksStore.updateProgress(
|
||||
topicId,
|
||||
totalStats.totalMsgs,
|
||||
totalStats.keysSize.sum + totalStats.valuesSize.sum,
|
||||
Math.min(100.0, (((double) totalStats.totalMsgs) / approxNumberOfMsgs) * 100)
|
||||
Math.min(100.0, (((double) processedOffsets) / summaryOffsetsRange) * 100)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import com.provectus.kafka.ui.exception.CustomBaseException;
|
|||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.Resource;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
|
@ -33,16 +34,20 @@ record AuditRecord(String timestamp,
|
|||
return MAPPER.writeValueAsString(this);
|
||||
}
|
||||
|
||||
record AuditResource(String accessType, Resource type, @Nullable Object id) {
|
||||
record AuditResource(String accessType, boolean alter, Resource type, @Nullable Object id) {
|
||||
|
||||
private static AuditResource create(PermissibleAction action, Resource type, @Nullable Object id) {
|
||||
return new AuditResource(action.name(), action.isAlter(), type, id);
|
||||
}
|
||||
|
||||
static List<AuditResource> getAccessedResources(AccessContext ctx) {
|
||||
List<AuditResource> resources = new ArrayList<>();
|
||||
ctx.getClusterConfigActions()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
|
||||
.forEach(a -> resources.add(create(a, Resource.CLUSTERCONFIG, null)));
|
||||
ctx.getTopicActions()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
|
||||
.forEach(a -> resources.add(create(a, Resource.TOPIC, nameId(ctx.getTopic()))));
|
||||
ctx.getConsumerGroupActions()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
|
||||
.forEach(a -> resources.add(create(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
|
||||
ctx.getConnectActions()
|
||||
.forEach(a -> {
|
||||
Map<String, String> resourceId = new LinkedHashMap<>();
|
||||
|
@ -50,16 +55,16 @@ record AuditRecord(String timestamp,
|
|||
if (ctx.getConnector() != null) {
|
||||
resourceId.put("connector", ctx.getConnector());
|
||||
}
|
||||
resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
|
||||
resources.add(create(a, Resource.CONNECT, resourceId));
|
||||
});
|
||||
ctx.getSchemaActions()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
|
||||
.forEach(a -> resources.add(create(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
|
||||
ctx.getKsqlActions()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
|
||||
.forEach(a -> resources.add(create(a, Resource.KSQL, null)));
|
||||
ctx.getAclActions()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
|
||||
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
|
||||
ctx.getAuditAction()
|
||||
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
|
||||
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
|
||||
return resources;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package com.provectus.kafka.ui.service.audit;
|
||||
|
||||
import static com.provectus.kafka.ui.config.ClustersProperties.AuditProperties.LogLevel.ALTER_ONLY;
|
||||
import static com.provectus.kafka.ui.service.MessagesService.createProducer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
|
||||
import com.provectus.kafka.ui.config.auth.RbacUser;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.AdminClientService;
|
||||
|
@ -20,6 +20,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
|
@ -27,7 +28,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.security.core.GrantedAuthority;
|
||||
import org.springframework.security.core.context.SecurityContext;
|
||||
import org.springframework.security.core.userdetails.UserDetails;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
|
@ -80,12 +83,13 @@ public class AuditService implements Closeable {
|
|||
}
|
||||
boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
|
||||
boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
|
||||
boolean alterLogOnly = Optional.ofNullable(auditProps.getLevel()).map(lvl -> lvl == ALTER_ONLY).orElse(true);
|
||||
if (!topicAudit && !consoleAudit) {
|
||||
return Optional.empty();
|
||||
}
|
||||
if (!topicAudit) {
|
||||
log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
|
||||
return Optional.of(consoleOnlyWriter(cluster));
|
||||
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
|
||||
}
|
||||
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
|
||||
boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
|
||||
|
@ -95,7 +99,7 @@ public class AuditService implements Closeable {
|
|||
"Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
|
||||
cluster.getName()
|
||||
);
|
||||
return Optional.of(consoleOnlyWriter(cluster));
|
||||
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
@ -103,6 +107,7 @@ public class AuditService implements Closeable {
|
|||
return Optional.of(
|
||||
new AuditWriter(
|
||||
cluster.getName(),
|
||||
alterLogOnly,
|
||||
auditTopicName,
|
||||
producerFactory.get(),
|
||||
consoleAudit ? AUDIT_LOGGER : null
|
||||
|
@ -110,8 +115,8 @@ public class AuditService implements Closeable {
|
|||
);
|
||||
}
|
||||
|
||||
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
|
||||
return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
|
||||
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster, boolean alterLogOnly) {
|
||||
return new AuditWriter(cluster.getName(), alterLogOnly, null, null, AUDIT_LOGGER);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,8 +197,11 @@ public class AuditService implements Closeable {
|
|||
if (sig.getContextView().hasKey(key)) {
|
||||
return sig.getContextView().<Mono<SecurityContext>>get(key)
|
||||
.map(context -> context.getAuthentication().getPrincipal())
|
||||
.cast(RbacUser.class)
|
||||
.map(user -> new AuthenticatedUser(user.name(), user.groups()))
|
||||
.cast(UserDetails.class)
|
||||
.map(user -> {
|
||||
var roles = user.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
|
||||
return new AuthenticatedUser(user.getUsername(), roles);
|
||||
})
|
||||
.switchIfEmpty(NO_AUTH_USER);
|
||||
} else {
|
||||
return NO_AUTH_USER;
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.slf4j.Logger;
|
|||
|
||||
@Slf4j
|
||||
record AuditWriter(String clusterName,
|
||||
boolean logAlterOperationsOnly,
|
||||
@Nullable String targetTopic,
|
||||
@Nullable KafkaProducer<byte[], byte[]> producer,
|
||||
@Nullable Logger consoleLogger) implements Closeable {
|
||||
|
@ -39,6 +40,10 @@ record AuditWriter(String clusterName,
|
|||
}
|
||||
|
||||
private void write(AuditRecord rec) {
|
||||
if (logAlterOperationsOnly && rec.resources().stream().noneMatch(AuditResource::alter)) {
|
||||
//we should only log alter operations, but this is read-only op
|
||||
return;
|
||||
}
|
||||
String json = rec.toJson();
|
||||
if (consoleLogger != null) {
|
||||
consoleLogger.info(json);
|
||||
|
|
|
@ -8,7 +8,7 @@ import java.util.Objects;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.opendatadiscovery.oddrn.JdbcUrlParser;
|
||||
import org.opendatadiscovery.oddrn.model.HivePath;
|
||||
import org.opendatadiscovery.oddrn.model.MysqlPath;
|
||||
|
|
|
@ -130,8 +130,8 @@ public class KsqlApiClient {
|
|||
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
|
||||
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
|
||||
* which will cause json parsing error and will be propagated to UI.
|
||||
* This is a know issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
|
||||
* To work around this we need to check DecodingException err msg.
|
||||
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
|
||||
* To workaround this we need to check DecodingException err msg.
|
||||
*/
|
||||
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
|
||||
return th instanceof DecodingException
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
package com.provectus.kafka.ui.service.masking;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
|
@ -9,6 +7,7 @@ import com.fasterxml.jackson.databind.node.ContainerNode;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.model.TopicMessageDTO;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.service.masking.policies.MaskingPolicy;
|
||||
import java.util.List;
|
||||
|
@ -54,7 +53,8 @@ public class DataMasking {
|
|||
Optional.ofNullable(property.getTopicValuesPattern()).map(Pattern::compile).orElse(null),
|
||||
MaskingPolicy.create(property)
|
||||
);
|
||||
}).collect(toList()));
|
||||
}).toList()
|
||||
);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -62,8 +62,17 @@ public class DataMasking {
|
|||
this.masks = masks;
|
||||
}
|
||||
|
||||
public UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
|
||||
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).collect(toList());
|
||||
public UnaryOperator<TopicMessageDTO> getMaskerForTopic(String topic) {
|
||||
var keyMasker = getMaskingFunction(topic, Serde.Target.KEY);
|
||||
var valMasker = getMaskingFunction(topic, Serde.Target.VALUE);
|
||||
return msg -> msg
|
||||
.key(keyMasker.apply(msg.getKey()))
|
||||
.content(valMasker.apply(msg.getContent()));
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
UnaryOperator<String> getMaskingFunction(String topic, Serde.Target target) {
|
||||
var targetMasks = masks.stream().filter(m -> m.shouldBeApplied(topic, target)).toList();
|
||||
if (targetMasks.isEmpty()) {
|
||||
return UnaryOperator.identity();
|
||||
}
|
||||
|
|
|
@ -11,6 +11,9 @@ import org.apache.kafka.common.Node;
|
|||
|
||||
class WellKnownMetrics {
|
||||
|
||||
private static final String BROKER_TOPIC_METRICS = "BrokerTopicMetrics";
|
||||
private static final String FIFTEEN_MINUTE_RATE = "FifteenMinuteRate";
|
||||
|
||||
// per broker
|
||||
final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
|
||||
final Map<Integer, BigDecimal> brokerBytesOutFifteenMinuteRate = new HashMap<>();
|
||||
|
@ -36,15 +39,15 @@ class WellKnownMetrics {
|
|||
if (!brokerBytesInFifteenMinuteRate.containsKey(node.id())
|
||||
&& rawMetric.labels().size() == 1
|
||||
&& "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
brokerBytesInFifteenMinuteRate.put(node.id(), rawMetric.value());
|
||||
}
|
||||
if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id())
|
||||
&& rawMetric.labels().size() == 1
|
||||
&& "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value());
|
||||
}
|
||||
}
|
||||
|
@ -53,8 +56,8 @@ class WellKnownMetrics {
|
|||
String name = rawMetric.name();
|
||||
String topic = rawMetric.labels().get("topic");
|
||||
if (topic != null
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
String nameProperty = rawMetric.labels().get("name");
|
||||
if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
|
||||
bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.stream.Collectors;
|
|||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.core.env.Environment;
|
||||
|
@ -52,6 +52,7 @@ import reactor.core.publisher.Mono;
|
|||
public class AccessControlService {
|
||||
|
||||
private static final String ACCESS_DENIED = "Access denied";
|
||||
private static final String ACTIONS_ARE_EMPTY = "actions are empty";
|
||||
|
||||
@Nullable
|
||||
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
|
||||
|
@ -206,7 +207,7 @@ public class AccessControlService {
|
|||
if (context.getTopic() == null && context.getTopicActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getTopicActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getTopicActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getTopicActions()
|
||||
.stream()
|
||||
|
@ -243,7 +244,7 @@ public class AccessControlService {
|
|||
if (context.getConsumerGroup() == null && context.getConsumerGroupActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getConsumerGroupActions()
|
||||
.stream()
|
||||
|
@ -276,7 +277,7 @@ public class AccessControlService {
|
|||
if (context.getSchema() == null && context.getSchemaActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getSchemaActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getSchemaActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getSchemaActions()
|
||||
.stream()
|
||||
|
@ -309,7 +310,7 @@ public class AccessControlService {
|
|||
if (context.getConnect() == null && context.getConnectActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getConnectActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getConnectActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getConnectActions()
|
||||
.stream()
|
||||
|
|
|
@ -59,8 +59,8 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_COGNITO))
|
||||
.filter(s -> s.getType().equals("group"))
|
||||
.anyMatch(subject -> Stream.of(groups)
|
||||
.map(Object::toString)
|
||||
.anyMatch(subject -> groups
|
||||
.stream()
|
||||
.anyMatch(cognitoGroup -> cognitoGroup.equals(subject.getValue()))
|
||||
))
|
||||
.map(Role::getName)
|
||||
|
|
|
@ -5,6 +5,8 @@ import static com.provectus.kafka.ui.model.rbac.provider.Provider.Name.GITHUB;
|
|||
import com.provectus.kafka.ui.model.rbac.Role;
|
||||
import com.provectus.kafka.ui.model.rbac.provider.Provider;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -26,6 +28,8 @@ public class GithubAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
private static final String ORGANIZATION_ATTRIBUTE_NAME = "organizations_url";
|
||||
private static final String USERNAME_ATTRIBUTE_NAME = "login";
|
||||
private static final String ORGANIZATION_NAME = "login";
|
||||
private static final String ORGANIZATION = "organization";
|
||||
private static final String TEAM_NAME = "slug";
|
||||
private static final String GITHUB_ACCEPT_HEADER = "application/vnd.github+json";
|
||||
private static final String DUMMY = "dummy";
|
||||
// The number of results (max 100) per page of list organizations for authenticated user.
|
||||
|
@ -46,7 +50,7 @@ public class GithubAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
throw new RuntimeException();
|
||||
}
|
||||
|
||||
Set<String> groupsByUsername = new HashSet<>();
|
||||
Set<String> rolesByUsername = new HashSet<>();
|
||||
String username = principal.getAttribute(USERNAME_ATTRIBUTE_NAME);
|
||||
if (username == null) {
|
||||
log.debug("Github username param is not present");
|
||||
|
@ -59,13 +63,7 @@ public class GithubAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
.filter(s -> s.getType().equals("user"))
|
||||
.anyMatch(s -> s.getValue().equals(username)))
|
||||
.map(Role::getName)
|
||||
.forEach(groupsByUsername::add);
|
||||
}
|
||||
|
||||
String organization = principal.getAttribute(ORGANIZATION_ATTRIBUTE_NAME);
|
||||
if (organization == null) {
|
||||
log.debug("Github organization param is not present");
|
||||
return Mono.just(groupsByUsername);
|
||||
.forEach(rolesByUsername::add);
|
||||
}
|
||||
|
||||
OAuth2UserRequest req = (OAuth2UserRequest) additionalParams.get("request");
|
||||
|
@ -80,8 +78,24 @@ public class GithubAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
.getUserInfoEndpoint()
|
||||
.getUri();
|
||||
}
|
||||
var webClient = WebClient.create(infoEndpoint);
|
||||
|
||||
WebClient webClient = WebClient.create(infoEndpoint);
|
||||
Mono<Set<String>> rolesByOrganization = getOrganizationRoles(principal, additionalParams, acs, webClient);
|
||||
Mono<Set<String>> rolesByTeams = getTeamRoles(webClient, additionalParams, acs);
|
||||
|
||||
return Mono.zip(rolesByOrganization, rolesByTeams)
|
||||
.map((t) -> Stream.of(t.getT1(), t.getT2(), rolesByUsername)
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
private Mono<Set<String>> getOrganizationRoles(DefaultOAuth2User principal, Map<String, Object> additionalParams,
|
||||
AccessControlService acs, WebClient webClient) {
|
||||
String organization = principal.getAttribute(ORGANIZATION_ATTRIBUTE_NAME);
|
||||
if (organization == null) {
|
||||
log.debug("Github organization param is not present");
|
||||
return Mono.just(Collections.emptySet());
|
||||
}
|
||||
|
||||
final Mono<List<Map<String, Object>>> userOrganizations = webClient
|
||||
.get()
|
||||
|
@ -99,22 +113,76 @@ public class GithubAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
//@formatter:on
|
||||
|
||||
return userOrganizations
|
||||
.map(orgsMap -> {
|
||||
var groupsByOrg = acs.getRoles()
|
||||
.stream()
|
||||
.filter(role -> role.getSubjects()
|
||||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_GITHUB))
|
||||
.filter(s -> s.getType().equals("organization"))
|
||||
.anyMatch(subject -> orgsMap.stream()
|
||||
.map(org -> org.get(ORGANIZATION_NAME).toString())
|
||||
.distinct()
|
||||
.anyMatch(orgName -> orgName.equalsIgnoreCase(subject.getValue()))
|
||||
))
|
||||
.map(Role::getName);
|
||||
.map(orgsMap -> acs.getRoles()
|
||||
.stream()
|
||||
.filter(role -> role.getSubjects()
|
||||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_GITHUB))
|
||||
.filter(s -> s.getType().equals(ORGANIZATION))
|
||||
.anyMatch(subject -> orgsMap.stream()
|
||||
.map(org -> org.get(ORGANIZATION_NAME).toString())
|
||||
.anyMatch(orgName -> orgName.equalsIgnoreCase(subject.getValue()))
|
||||
))
|
||||
.map(Role::getName)
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
return Stream.concat(groupsByOrg, groupsByUsername.stream()).collect(Collectors.toSet());
|
||||
});
|
||||
@SuppressWarnings("unchecked")
|
||||
private Mono<Set<String>> getTeamRoles(WebClient webClient, Map<String, Object> additionalParams,
|
||||
AccessControlService acs) {
|
||||
|
||||
var requestedTeams = acs.getRoles()
|
||||
.stream()
|
||||
.filter(r -> r.getSubjects()
|
||||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_GITHUB))
|
||||
.anyMatch(s -> s.getType().equals("team")))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (requestedTeams.isEmpty()) {
|
||||
log.debug("No roles with github teams found, skipping");
|
||||
return Mono.just(Collections.emptySet());
|
||||
}
|
||||
|
||||
final Mono<List<Map<String, Object>>> rawTeams = webClient
|
||||
.get()
|
||||
.uri(uriBuilder -> uriBuilder.path("/teams")
|
||||
.queryParam("per_page", ORGANIZATIONS_PER_PAGE)
|
||||
.build())
|
||||
.headers(headers -> {
|
||||
headers.set(HttpHeaders.ACCEPT, GITHUB_ACCEPT_HEADER);
|
||||
OAuth2UserRequest request = (OAuth2UserRequest) additionalParams.get("request");
|
||||
headers.setBearerAuth(request.getAccessToken().getTokenValue());
|
||||
})
|
||||
.retrieve()
|
||||
//@formatter:off
|
||||
.bodyToMono(new ParameterizedTypeReference<>() {});
|
||||
//@formatter:on
|
||||
|
||||
final Mono<List<String>> mappedTeams = rawTeams
|
||||
.map(teams -> teams.stream()
|
||||
.map(teamInfo -> {
|
||||
var name = teamInfo.get(TEAM_NAME);
|
||||
var orgInfo = (Map<String, Object>) teamInfo.get(ORGANIZATION);
|
||||
var orgName = orgInfo.get(ORGANIZATION_NAME);
|
||||
return orgName + "/" + name;
|
||||
})
|
||||
.map(Object::toString)
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
|
||||
return mappedTeams
|
||||
.map(teams -> acs.getRoles()
|
||||
.stream()
|
||||
.filter(role -> role.getSubjects()
|
||||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_GITHUB))
|
||||
.filter(s -> s.getType().equals("team"))
|
||||
.anyMatch(subject -> teams.stream()
|
||||
.anyMatch(teamName -> teamName.equalsIgnoreCase(subject.getValue()))
|
||||
))
|
||||
.map(Role::getName)
|
||||
.collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,10 @@ import lombok.RequiredArgsConstructor;
|
|||
@RequiredArgsConstructor(access = PRIVATE)
|
||||
public class ApplicationMetrics {
|
||||
|
||||
// kafka-ui specific metrics prefix. Added to make it easier to distinguish kui metrics from
|
||||
// other metrics, exposed by spring boot (like http stats, jvm, etc.)
|
||||
private static final String COMMON_PREFIX = "kui_";
|
||||
|
||||
private final String clusterName;
|
||||
private final MeterRegistry registry;
|
||||
|
||||
|
@ -40,7 +44,7 @@ public class ApplicationMetrics {
|
|||
}
|
||||
|
||||
private Counter polledRecords(String topic) {
|
||||
return Counter.builder("topic_records_polled")
|
||||
return Counter.builder(COMMON_PREFIX + "topic_records_polled")
|
||||
.description("Number of records polled from topic")
|
||||
.tag("cluster", clusterName)
|
||||
.tag("topic", topic)
|
||||
|
@ -48,7 +52,7 @@ public class ApplicationMetrics {
|
|||
}
|
||||
|
||||
private DistributionSummary polledBytes(String topic) {
|
||||
return DistributionSummary.builder("topic_polled_bytes")
|
||||
return DistributionSummary.builder(COMMON_PREFIX + "topic_polled_bytes")
|
||||
.description("Bytes polled from kafka topic")
|
||||
.tag("cluster", clusterName)
|
||||
.tag("topic", topic)
|
||||
|
@ -56,7 +60,7 @@ public class ApplicationMetrics {
|
|||
}
|
||||
|
||||
private Timer pollTimer(String topic) {
|
||||
return Timer.builder("topic_poll_time")
|
||||
return Timer.builder(COMMON_PREFIX + "topic_poll_time")
|
||||
.description("Time spend in polling for topic")
|
||||
.tag("cluster", clusterName)
|
||||
.tag("topic", topic)
|
||||
|
@ -64,7 +68,7 @@ public class ApplicationMetrics {
|
|||
}
|
||||
|
||||
private Counter pollThrottlingActivations() {
|
||||
return Counter.builder("poll_throttling_activations")
|
||||
return Counter.builder(COMMON_PREFIX + "poll_throttling_activations")
|
||||
.description("Number of poll throttling activations")
|
||||
.tag("cluster", clusterName)
|
||||
.register(registry);
|
||||
|
@ -72,7 +76,7 @@ public class ApplicationMetrics {
|
|||
|
||||
public AtomicInteger activeConsumers() {
|
||||
var count = new AtomicInteger();
|
||||
Gauge.builder("active_consumers", () -> count)
|
||||
Gauge.builder(COMMON_PREFIX + "active_consumers", () -> count)
|
||||
.description("Number of active consumers")
|
||||
.tag("cluster", clusterName)
|
||||
.register(registry);
|
||||
|
|
|
@ -45,6 +45,7 @@ import reactor.core.publisher.Mono;
|
|||
public class DynamicConfigOperations {
|
||||
|
||||
static final String DYNAMIC_CONFIG_ENABLED_ENV_PROPERTY = "dynamic.config.enabled";
|
||||
static final String FILTERING_GROOVY_ENABLED_PROPERTY = "filtering.groovy.enabled";
|
||||
static final String DYNAMIC_CONFIG_PATH_ENV_PROPERTY = "dynamic.config.path";
|
||||
static final String DYNAMIC_CONFIG_PATH_ENV_PROPERTY_DEFAULT = "/etc/kafkaui/dynamic_config.yaml";
|
||||
|
||||
|
@ -64,6 +65,10 @@ public class DynamicConfigOperations {
|
|||
return "true".equalsIgnoreCase(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_ENABLED_ENV_PROPERTY));
|
||||
}
|
||||
|
||||
public boolean filteringGroovyEnabled() {
|
||||
return "true".equalsIgnoreCase(ctx.getEnvironment().getProperty(FILTERING_GROOVY_ENABLED_PROPERTY));
|
||||
}
|
||||
|
||||
private Path dynamicConfigFilePath() {
|
||||
return Paths.get(
|
||||
Optional.ofNullable(ctx.getEnvironment().getProperty(DYNAMIC_CONFIG_PATH_ENV_PROPERTY))
|
||||
|
@ -147,6 +152,14 @@ public class DynamicConfigOperations {
|
|||
.onErrorMap(th -> new FileUploadException(targetFilePath, th));
|
||||
}
|
||||
|
||||
public void checkIfFilteringGroovyEnabled() {
|
||||
if (!filteringGroovyEnabled()) {
|
||||
throw new ValidationException(
|
||||
"Groovy filters is not allowed. "
|
||||
+ "Set filtering.groovy.enabled property to 'true' to enabled it.");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkIfDynamicConfigEnabled() {
|
||||
if (!dynamicConfigEnabled()) {
|
||||
throw new ValidationException(
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
package com.provectus.kafka.ui.util;
|
||||
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import java.util.Properties;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
|
||||
public final class KafkaClientSslPropertiesUtil {
|
||||
|
||||
private KafkaClientSslPropertiesUtil() {
|
||||
}
|
||||
|
||||
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
|
||||
Properties sink) {
|
||||
if (truststoreConfig == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (truststoreConfig.getTruststoreLocation() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
|
||||
|
||||
if (truststoreConfig.getTruststorePassword() != null) {
|
||||
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
|
||||
}
|
||||
|
||||
if (!truststoreConfig.isVerifySsl()) {
|
||||
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -65,7 +65,7 @@ public final class KafkaServicesValidation {
|
|||
@Nullable
|
||||
TruststoreConfig ssl) {
|
||||
Properties properties = new Properties();
|
||||
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
|
||||
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
|
||||
properties.putAll(clusterProps);
|
||||
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
// editing properties to make validation faster
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Reference in a new issue