Merge branch 'master' into #3318

This commit is contained in:
Roman Zabaluev 2023-08-04 19:54:13 +07:00 committed by GitHub
commit e1a79a4f3f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
114 changed files with 1851 additions and 541 deletions

View file

@ -1,4 +1,4 @@
name: AWS Marketplace Publisher
name: "Infra: Release: AWS Marketplace Publisher"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: Backend build and test
name: "Backend: PR/master build & test"
on:
push:
branches:
@ -8,6 +8,9 @@ on:
paths:
- "kafka-ui-api/**"
- "pom.xml"
permissions:
checks: write
pull-requests: write
jobs:
build-and-test:
runs-on: ubuntu-latest
@ -29,7 +32,7 @@ jobs:
key: ${{ runner.os }}-sonar
restore-keys: ${{ runner.os }}-sonar
- name: Build and analyze pull request target
if: ${{ github.event_name == 'pull_request_target' }}
if: ${{ github.event_name == 'pull_request' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_BACKEND }}

View file

@ -1,4 +1,4 @@
name: Pull Request Labels
name: "Infra: PR block merge"
on:
pull_request:
types: [opened, labeled, unlabeled, synchronize]

View file

@ -1,4 +1,4 @@
name: Feature testing init
name: "Infra: Feature Testing: Init env"
on:
workflow_dispatch:

View file

@ -1,4 +1,4 @@
name: Feature testing destroy
name: "Infra: Feature Testing: Destroy env"
on:
workflow_dispatch:
pull_request:

View file

@ -1,4 +1,4 @@
name: Build Docker image and push
name: "Infra: Image Testing: Deploy"
on:
workflow_dispatch:
pull_request:
@ -65,7 +65,7 @@ jobs:
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache
- name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v2
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |

View file

@ -1,4 +1,4 @@
name: Delete Public ECR Image
name: "Infra: Image Testing: Delete"
on:
workflow_dispatch:
pull_request:

View file

@ -1,4 +1,4 @@
name: Documentation URLs linter
name: "Infra: Docs: URL linter"
on:
pull_request:
types:

View file

@ -1,4 +1,4 @@
name: E2E Automation suite
name: "E2E: Automation suite"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: E2E PR health check
name: "E2E: PR healthcheck"
on:
pull_request_target:
types: [ "opened", "edited", "reopened", "synchronize" ]
@ -8,6 +8,8 @@ on:
- "kafka-ui-react-app/**"
- "kafka-ui-e2e-checks/**"
- "pom.xml"
permissions:
statuses: write
jobs:
build-and-test:
runs-on: ubuntu-latest
@ -18,8 +20,8 @@ jobs:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-access-key-id: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
aws-region: eu-central-1
- name: Set up environment
id: set_env_values

View file

@ -1,4 +1,4 @@
name: E2E Manual suite
name: "E2E: Manual suite"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: E2E Weekly suite
name: "E2E: Weekly suite"
on:
schedule:
- cron: '0 1 * * 1'

View file

@ -1,4 +1,4 @@
name: Frontend build and test
name: "Frontend: PR/master build & test"
on:
push:
branches:
@ -8,6 +8,9 @@ on:
paths:
- "kafka-ui-contract/**"
- "kafka-ui-react-app/**"
permissions:
checks: write
pull-requests: write
jobs:
build-and-test:
env:
@ -24,7 +27,7 @@ jobs:
with:
version: 7.4.0
- name: Install node
uses: actions/setup-node@v3.6.0
uses: actions/setup-node@v3.7.0
with:
node-version: "16.15.0"
cache: "pnpm"

View file

@ -1,4 +1,4 @@
name: Master branch build & deploy
name: "Master: Build & deploy"
on:
workflow_dispatch:
push:
@ -58,6 +58,7 @@ jobs:
builder: ${{ steps.buildx.outputs.name }}
context: kafka-ui-api
platforms: linux/amd64,linux/arm64
provenance: false
push: true
tags: |
provectuslabs/kafka-ui:${{ steps.build.outputs.version }}

View file

@ -1,8 +1,9 @@
name: "PR Checklist checked"
name: "PR: Checklist linter"
on:
pull_request_target:
types: [opened, edited, synchronize, reopened]
permissions:
checks: write
jobs:
task-check:
runs-on: ubuntu-latest

View file

@ -1,4 +1,4 @@
name: Release serde api
name: "Infra: Release: Serde API"
on: workflow_dispatch
jobs:

View file

@ -1,4 +1,4 @@
name: Release
name: "Infra: Release"
on:
release:
types: [published]

View file

@ -1,4 +1,4 @@
name: Release Drafter
name: "Infra: Release Drafter run"
on:
push:

View file

@ -1,4 +1,4 @@
name: Separate environment create
name: "Infra: Feature Testing Public: Init env"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: Separate environment remove
name: "Infra: Feature Testing Public: Destroy env"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: 'Close stale issues'
name: 'Infra: Close stale issues'
on:
schedule:
- cron: '30 1 * * *'

View file

@ -1,4 +1,4 @@
name: Terraform deploy
name: "Infra: Terraform deploy"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: Add triage label to new issues
name: "Infra: Triage: Apply triage label for issues"
on:
issues:
types:

View file

@ -1,4 +1,4 @@
name: Add triage label to new PRs
name: "Infra: Triage: Apply triage label for PRs"
on:
pull_request:
types:

View file

@ -7,7 +7,9 @@ on:
issues:
types:
- opened
permissions:
issues: write
pull-requests: write
jobs:
welcome:
runs-on: ubuntu-latest

View file

@ -1,4 +1,4 @@
name: "Workflow linter"
name: "Infra: Workflow linter"
on:
pull_request:
types:

View file

@ -18,6 +18,10 @@
<a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
</p>
<p align="center">
<img src="https://repobeats.axiom.co/api/embed/2e8a7c2d711af9daddd34f9791143e7554c35d0f.svg" />
</p>
#### UI for Apache Kafka is a free, open-source web UI to monitor and manage Apache Kafka clusters.
UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.

View file

@ -1,7 +1,11 @@
#FROM azul/zulu-openjdk-alpine:17-jre-headless
FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
RUN apk add --no-cache gcompat # need to make snappy codec work
RUN apk add --no-cache \
# snappy codec
gcompat \
# configuring timezones
tzdata
RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui
# creating folder for dynamic config usage (certificates uploads, etc)

View file

@ -91,7 +91,7 @@
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.6</version>
<version>1.1.7</version>
</dependency>
<dependency>
@ -114,6 +114,11 @@
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View file

@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
"/resources/**",
"/actuator/health/**",
"/actuator/info",
"/actuator/prometheus",
"/auth",
"/login",
"/logout",

View file

@ -7,12 +7,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
import org.springframework.security.web.server.ui.LogoutPageGeneratingWebFilter;
@Configuration
@EnableWebFluxSecurity
@ -33,15 +31,17 @@ public class BasicAuthSecurityConfig extends AbstractAuthSecurityConfig {
final var logoutSuccessHandler = new RedirectServerLogoutSuccessHandler();
logoutSuccessHandler.setLogoutSuccessUrl(URI.create(LOGOUT_URL));
return http
.addFilterAfter(new LogoutPageGeneratingWebFilter(), SecurityWebFiltersOrder.REACTOR_CONTEXT)
.csrf().disable()
.authorizeExchange()
.pathMatchers(AUTH_WHITELIST).permitAll()
.anyExchange().authenticated()
.and().formLogin().loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler)
.and().logout().logoutSuccessHandler(logoutSuccessHandler)
.and().build();
return http.authorizeExchange(spec -> spec
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
)
.formLogin(spec -> spec.loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler))
.logout(spec -> spec.logoutSuccessHandler(logoutSuccessHandler))
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}
}

View file

@ -27,10 +27,12 @@ public class DisabledAuthSecurityConfig extends AbstractAuthSecurityConfig {
System.exit(1);
}
log.warn("Authentication is disabled. Access will be unrestricted.");
return http.authorizeExchange()
.anyExchange().permitAll()
.and()
.csrf().disable()
return http.authorizeExchange(spec -> spec
.anyExchange()
.permitAll()
)
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}

View file

@ -15,6 +15,8 @@ public class LdapProperties {
private String userFilterSearchBase;
private String userFilterSearchFilter;
private String groupFilterSearchBase;
private String groupFilterSearchFilter;
private String groupRoleAttribute;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;

View file

@ -3,14 +3,16 @@ package com.provectus.kafka.ui.config.auth;
import static com.provectus.kafka.ui.config.auth.AbstractAuthSecurityConfig.AUTH_WHITELIST;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.service.rbac.extractor.RbacLdapAuthoritiesExtractor;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -22,6 +24,7 @@ import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.authentication.ProviderManager;
import org.springframework.security.authentication.ReactiveAuthenticationManager;
import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.GrantedAuthority;
@ -50,9 +53,9 @@ public class LdapSecurityConfig {
@Bean
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource,
LdapAuthoritiesPopulator ldapAuthoritiesPopulator,
@Nullable AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
LdapAuthoritiesPopulator authoritiesExtractor,
AccessControlService acs) {
var rbacEnabled = acs.isRbacEnabled();
BindAuthenticator ba = new BindAuthenticator(contextSource);
if (props.getBase() != null) {
ba.setUserDnPatterns(new String[] {props.getBase()});
@ -67,7 +70,7 @@ public class LdapSecurityConfig {
AbstractLdapAuthenticationProvider authenticationProvider;
if (!props.isActiveDirectory()) {
authenticationProvider = rbacEnabled
? new LdapAuthenticationProvider(ba, ldapAuthoritiesPopulator)
? new LdapAuthenticationProvider(ba, authoritiesExtractor)
: new LdapAuthenticationProvider(ba);
} else {
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(props.getActiveDirectoryDomain(),
@ -97,11 +100,24 @@ public class LdapSecurityConfig {
@Bean
@Primary
public LdapAuthoritiesPopulator ldapAuthoritiesPopulator(BaseLdapPathContextSource contextSource) {
var authoritiesPopulator = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
authoritiesPopulator.setRolePrefix("");
authoritiesPopulator.setConvertToUpperCase(false);
return authoritiesPopulator;
public DefaultLdapAuthoritiesPopulator ldapAuthoritiesExtractor(ApplicationContext context,
BaseLdapPathContextSource contextSource,
AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
DefaultLdapAuthoritiesPopulator extractor;
if (rbacEnabled) {
extractor = new RbacLdapAuthoritiesExtractor(context, contextSource, props.getGroupFilterSearchBase());
} else {
extractor = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
}
Optional.ofNullable(props.getGroupFilterSearchFilter()).ifPresent(extractor::setGroupSearchFilter);
extractor.setRolePrefix("");
extractor.setConvertToUpperCase(false);
extractor.setSearchSubtree(true);
return extractor;
}
@Bean
@ -111,21 +127,15 @@ public class LdapSecurityConfig {
log.info("Active Directory support for LDAP has been enabled.");
}
return http
.authorizeExchange()
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
.and()
.formLogin()
.and()
.logout()
.and()
.csrf().disable()
return http.authorizeExchange(spec -> spec
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
)
.formLogin(Customizer.withDefaults())
.logout(Customizer.withDefaults())
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}

View file

@ -12,10 +12,11 @@ import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.Nullable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientProperties;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesRegistrationAdapter;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesMapper;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
@ -49,21 +50,15 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
public SecurityWebFilterChain configure(ServerHttpSecurity http, OAuthLogoutSuccessHandler logoutHandler) {
log.info("Configuring OAUTH2 authentication.");
return http.authorizeExchange()
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
.and()
.oauth2Login()
.and()
.logout()
.logoutSuccessHandler(logoutHandler)
.and()
.csrf().disable()
return http.authorizeExchange(spec -> spec
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
)
.oauth2Login(Customizer.withDefaults())
.logout(spec -> spec.logoutSuccessHandler(logoutHandler))
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}
@ -103,7 +98,10 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
public InMemoryReactiveClientRegistrationRepository clientRegistrationRepository() {
final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
final List<ClientRegistration> registrations =
new ArrayList<>(OAuth2ClientPropertiesRegistrationAdapter.getClientRegistrations(props).values());
new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
if (registrations.isEmpty()) {
throw new IllegalArgumentException("OAuth2 authentication is enabled but no providers specified.");
}
return new InMemoryReactiveClientRegistrationRepository(registrations);
}

View file

@ -1,13 +1,14 @@
package com.provectus.kafka.ui.config.auth.condition;
import com.provectus.kafka.ui.service.rbac.AbstractProviderCondition;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
public class CognitoCondition extends AbstractProviderCondition implements Condition {
@Override
public boolean matches(final ConditionContext context, final AnnotatedTypeMetadata metadata) {
public boolean matches(final ConditionContext context, final @NotNull AnnotatedTypeMetadata metadata) {
return getRegisteredProvidersTypes(context.getEnvironment()).stream().anyMatch(a -> a.equalsIgnoreCase("cognito"));
}
}
}

View file

@ -2,6 +2,9 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.AclsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
import com.provectus.kafka.ui.model.KafkaAclDTO;
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
@ -123,4 +126,55 @@ public class AclsController extends AbstractController implements AclsApi {
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
Mono<CreateConsumerAclDTO> createConsumerAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createConsumerAcl")
.build();
return accessControlService.validateAccess(context)
.then(createConsumerAclDto)
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
Mono<CreateProducerAclDTO> createProducerAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createProducerAcl")
.build();
return accessControlService.validateAccess(context)
.then(createProducerAclDto)
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createStreamAppAcl")
.build();
return accessControlService.validateAccess(context)
.then(createStreamAppAclDto)
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -123,9 +123,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
.operationName("deleteTopic")
.build();
return accessControlService.validateAccess(context).then(
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
).doOnEach(sig -> auditService.audit(context, sig));
return accessControlService.validateAccess(context)
.then(
topicsService.deleteTopic(getCluster(clusterName), topicName)
.thenReturn(ResponseEntity.ok().<Void>build())
).doOnEach(sig -> auditService.audit(context, sig));
}

View file

@ -2,37 +2,28 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter {
private final MessagesProcessing messagesProcessing;
private final PollingThrottler throttler;
protected final PollingSettings pollingSettings;
protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
this.messagesProcessing = messagesProcessing;
this.pollingSettings = pollingSettings;
this.throttler = pollingSettings.getPollingThrottler();
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
protected PolledRecords poll(
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
Instant start = Instant.now();
ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
Instant finish = Instant.now();
int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
throttler.throttleAfterPoll(polledBytes);
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
sendConsuming(sink, records);
return records;
}
@ -49,10 +40,8 @@ public abstract class AbstractEmitter {
messagesProcessing.sendPhase(sink, name);
}
protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> records,
long elapsed) {
return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
messagesProcessing.sentConsumingInfo(sink, records);
}
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {

View file

@ -9,9 +9,7 @@ import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
@ -22,12 +20,12 @@ public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
public BackwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
@ -41,7 +39,7 @@ public class BackwardRecordEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting backward polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
@ -91,7 +89,7 @@ public class BackwardRecordEmitter
TopicPartition tp,
long fromOffset,
long toOffset,
Consumer<Bytes, Bytes> consumer,
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
@ -101,13 +99,13 @@ public class BackwardRecordEmitter
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords);
emptyPolls.count(polledRecords.count());
log.debug("{} records polled from {}", polledRecords.count(), tp);
@ -115,7 +113,7 @@ public class BackwardRecordEmitter
.filter(r -> r.offset() < toOffset)
.toList();
if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}

View file

@ -2,9 +2,6 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
class ConsumingStats {
@ -13,23 +10,17 @@ class ConsumingStats {
private int records = 0;
private long elapsed = 0;
/**
* returns bytes polled.
*/
int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
PolledRecords polledRecords,
int filterApplyErrors) {
int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
bytes += polledBytes;
bytes += polledRecords.bytes();
this.records += polledRecords.count();
this.elapsed += elapsed;
this.elapsed += polledRecords.elapsed().toMillis();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
return polledBytes;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {

View file

@ -17,8 +17,8 @@ public class EmptyPollsCounter {
this.maxEmptyPolls = maxEmptyPolls;
}
public void count(ConsumerRecords<?, ?> polled) {
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
public void count(int polledCount) {
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
}
public boolean noDataEmptyPollsReached() {

View file

@ -0,0 +1,82 @@
package com.provectus.kafka.ui.emitter;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
private final PollingThrottler throttler;
private final ApplicationMetrics metrics;
private String pollingTopic;
public EnhancedConsumer(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
super(properties, new BytesDeserializer(), new BytesDeserializer());
this.throttler = throttler;
this.metrics = metrics;
metrics.activeConsumers().incrementAndGet();
}
public PolledRecords pollEnhanced(Duration dur) {
var stopwatch = Stopwatch.createStarted();
ConsumerRecords<Bytes, Bytes> polled = poll(dur);
PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
return polledEnhanced;
}
@Override
public void assign(Collection<TopicPartition> partitions) {
super.assign(partitions);
Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Preconditions.checkState(assignedTopics.size() == 1);
this.pollingTopic = assignedTopics.iterator().next();
}
@Override
public void subscribe(Pattern pattern) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}
}

View file

@ -5,8 +5,6 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@ -16,11 +14,11 @@ public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition position;
public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition position,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
@ -32,7 +30,7 @@ public class ForwardRecordEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting forward polling for {}", position);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
@ -44,8 +42,8 @@ public class ForwardRecordEmitter
&& !emptyPolls.noDataEmptyPollsReached()) {
sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
emptyPolls.count(records);
var records = poll(sink, consumer);
emptyPolls.count(records.count());
log.debug("{} records polled", records.count());

View file

@ -8,7 +8,6 @@ import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@ -54,13 +53,10 @@ public class MessagesProcessing {
}
}
int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed) {
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
if (!sink.isCancelled()) {
return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
}
return 0;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {

View file

@ -0,0 +1,48 @@
package com.provectus.kafka.ui.emitter;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;
public record PolledRecords(int count,
int bytes,
Duration elapsed,
ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
return new PolledRecords(
polled.count(),
calculatePolledRecSize(polled),
pollDuration,
polled
);
}
public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
return records.records(tp);
}
@Override
public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
return records.iterator();
}
private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
}
return polledBytes;
}
}

View file

@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
@Slf4j
public class PollingThrottler {
@ -36,18 +33,17 @@ public class PollingThrottler {
return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
}
public void throttleAfterPoll(int polledBytes) {
//returns true if polling was throttled
public boolean throttleAfterPoll(int polledBytes) {
if (polledBytes > 0) {
double sleptSeconds = rateLimiter.acquire(polledBytes);
if (!throttled && sleptSeconds > 0.0) {
throttled = true;
log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
return true;
}
}
}
public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
return false;
}
}

View file

@ -5,19 +5,17 @@ import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.HashMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class TailingEmitter extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
@ -29,7 +27,7 @@ public class TailingEmitter extends AbstractEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting tailing polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
assignAndSeek(consumer);
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
@ -47,7 +45,7 @@ public class TailingEmitter extends AbstractEmitter
}
}
private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
private void assignAndSeek(EnhancedConsumer consumer) {
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions

View file

@ -34,7 +34,7 @@ public interface KafkaConnectMapper {
com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse
connectorPluginConfigValidationResponse);
default FullConnectorInfoDTO fullConnectorInfoFromTuple(InternalConnectInfo connectInfo) {
default FullConnectorInfoDTO fullConnectorInfo(InternalConnectInfo connectInfo) {
ConnectorDTO connector = connectInfo.getConnector();
List<TaskDTO> tasks = connectInfo.getTasks();
int failedTasksCount = (int) tasks.stream()

View file

@ -3,18 +3,21 @@ package com.provectus.kafka.ui.mapper;
import com.provectus.kafka.ui.model.CompatibilityCheckResponseDTO;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.sr.model.Compatibility;
import com.provectus.kafka.ui.sr.model.CompatibilityCheckResponse;
import com.provectus.kafka.ui.sr.model.NewSubject;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import com.provectus.kafka.ui.sr.model.SchemaType;
import java.util.List;
import java.util.Optional;
import org.mapstruct.Mapper;
@Mapper(componentModel = "spring")
@Mapper
public interface KafkaSrMapper {
default SchemaSubjectDTO toDto(SchemaRegistryService.SubjectWithCompatibilityLevel s) {
@ -24,9 +27,12 @@ public interface KafkaSrMapper {
.subject(s.getSubject())
.schema(s.getSchema())
.schemaType(SchemaTypeDTO.fromValue(Optional.ofNullable(s.getSchemaType()).orElse(SchemaType.AVRO).getValue()))
.references(toDto(s.getReferences()))
.compatibilityLevel(s.getCompatibility().toString());
}
List<SchemaReferenceDTO> toDto(List<SchemaReference> references);
CompatibilityCheckResponseDTO toDto(CompatibilityCheckResponse ccr);
CompatibilityLevelDTO.CompatibilityEnum toDto(Compatibility compatibility);

View file

@ -16,6 +16,7 @@ import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
@ -73,6 +74,10 @@ public class Permission {
}
private List<String> getAllActionValues() {
if (resource == null) {
return Collections.emptyList();
}
return switch (this.resource) {
case APPLICATIONCONFIG -> Arrays.stream(ApplicationConfigAction.values()).map(Enum::toString).toList();
case CLUSTERCONFIG -> Arrays.stream(ClusterConfigAction.values()).map(Enum::toString).toList();

View file

@ -12,6 +12,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.builtin.AvroEmbeddedSerde;
import com.provectus.kafka.ui.serdes.builtin.Base64Serde;
import com.provectus.kafka.ui.serdes.builtin.ConsumerOffsetsSerde;
import com.provectus.kafka.ui.serdes.builtin.HexSerde;
import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
@ -47,6 +48,7 @@ public class SerdesInitializer {
.put(UInt64Serde.name(), UInt64Serde.class)
.put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
.put(Base64Serde.name(), Base64Serde.class)
.put(HexSerde.name(), HexSerde.class)
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
.build(),
new CustomSerdeLoader()

View file

@ -19,12 +19,6 @@ public class AvroEmbeddedSerde implements BuiltInSerde {
return "Avro (Embedded)";
}
@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}
@Override
public Optional<String> getDescription() {
return Optional.empty();

View file

@ -1,8 +1,6 @@
package com.provectus.kafka.ui.serdes.builtin;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Base64;
@ -16,12 +14,6 @@ public class Base64Serde implements BuiltInSerde {
return "Base64";
}
@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}
@Override
public Optional<String> getDescription() {
return Optional.empty();
@ -44,31 +36,25 @@ public class Base64Serde implements BuiltInSerde {
@Override
public Serializer serializer(String topic, Target type) {
return new Serializer() {
@Override
public byte[] serialize(String input) {
input = input.trim();
// it is actually a hack to provide ability to sent empty array as a key/value
if (input.length() == 0) {
return new byte[]{};
}
return Base64.getDecoder().decode(input);
var decoder = Base64.getDecoder();
return inputString -> {
inputString = inputString.trim();
// it is actually a hack to provide ability to sent empty array as a key/value
if (inputString.length() == 0) {
return new byte[] {};
}
return decoder.decode(inputString);
};
}
@Override
public Deserializer deserializer(String topic, Target type) {
var encoder = Base64.getEncoder();
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return new DeserializeResult(
return (headers, data) ->
new DeserializeResult(
encoder.encodeToString(data),
DeserializeResult.Type.STRING,
Map.of()
);
}
};
}
}

View file

@ -0,0 +1,89 @@
package com.provectus.kafka.ui.serdes.builtin;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.PropertyResolver;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.HexFormat;
import java.util.Map;
import java.util.Optional;
public class HexSerde implements BuiltInSerde {
private HexFormat deserializeHexFormat;
public static String name() {
return "Hex";
}
@Override
public void autoConfigure(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
configure(" ", true);
}
@Override
public void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
String delim = serdeProperties.getProperty("delimiter", String.class).orElse(" ");
boolean uppercase = serdeProperties.getProperty("uppercase", Boolean.class).orElse(true);
configure(delim, uppercase);
}
private void configure(String delim, boolean uppercase) {
deserializeHexFormat = HexFormat.ofDelimiter(delim);
if (uppercase) {
deserializeHexFormat = deserializeHexFormat.withUpperCase();
}
}
@Override
public Optional<String> getDescription() {
return Optional.empty();
}
@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}
@Override
public boolean canDeserialize(String topic, Target type) {
return true;
}
@Override
public boolean canSerialize(String topic, Target type) {
return true;
}
@Override
public Serializer serializer(String topic, Target type) {
return input -> {
input = input.trim();
// it is a hack to provide ability to sent empty array as a key/value
if (input.length() == 0) {
return new byte[] {};
}
return HexFormat.of().parseHex(prepareInputForParse(input));
};
}
// removing most-common delimiters and prefixes
private static String prepareInputForParse(String input) {
return input
.replaceAll(" ", "")
.replaceAll("#", "")
.replaceAll(":", "");
}
@Override
public Deserializer deserializer(String topic, Target type) {
return (headers, data) ->
new DeserializeResult(
deserializeHexFormat.formatHex(data),
DeserializeResult.Type.STRING,
Map.of()
);
}
}

View file

@ -55,15 +55,11 @@ public class Int64Serde implements BuiltInSerde {
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return new DeserializeResult(
return (headers, data) ->
new DeserializeResult(
String.valueOf(Longs.fromByteArray(data)),
DeserializeResult.Type.JSON,
Map.of()
);
}
};
}
}

View file

@ -1,10 +1,8 @@
package com.provectus.kafka.ui.serdes.builtin;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedInteger;
import com.google.common.primitives.UnsignedLong;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.RecordHeaders;
import com.provectus.kafka.ui.serde.api.SchemaDescription;
import com.provectus.kafka.ui.serdes.BuiltInSerde;
import java.util.Map;
@ -32,7 +30,7 @@ public class UInt64Serde implements BuiltInSerde {
+ " \"minimum\" : 0, "
+ " \"maximum\" : %s "
+ "}",
UnsignedInteger.MAX_VALUE
UnsignedLong.MAX_VALUE
),
Map.of()
)
@ -56,15 +54,11 @@ public class UInt64Serde implements BuiltInSerde {
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
return new DeserializeResult(
return (headers, data) ->
new DeserializeResult(
UnsignedLong.fromLongBits(Longs.fromByteArray(data)).toString(),
DeserializeResult.Type.JSON,
Map.of()
);
}
};
}
}

View file

@ -50,41 +50,35 @@ public class UuidBinarySerde implements BuiltInSerde {
@Override
public Serializer serializer(String topic, Target type) {
return new Serializer() {
@Override
public byte[] serialize(String input) {
UUID uuid = UUID.fromString(input);
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
if (mostSignificantBitsFirst) {
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
} else {
bb.putLong(uuid.getLeastSignificantBits());
bb.putLong(uuid.getMostSignificantBits());
}
return bb.array();
return input -> {
UUID uuid = UUID.fromString(input);
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
if (mostSignificantBitsFirst) {
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
} else {
bb.putLong(uuid.getLeastSignificantBits());
bb.putLong(uuid.getMostSignificantBits());
}
return bb.array();
};
}
@Override
public Deserializer deserializer(String topic, Target type) {
return new Deserializer() {
@Override
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
if (data.length != 16) {
throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
}
ByteBuffer bb = ByteBuffer.wrap(data);
long msb = bb.getLong();
long lsb = bb.getLong();
UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
return new DeserializeResult(
uuid.toString(),
DeserializeResult.Type.STRING,
Map.of()
);
return (headers, data) -> {
if (data.length != 16) {
throw new ValidationException("UUID data should be 16 bytes, but it is " + data.length);
}
ByteBuffer bb = ByteBuffer.wrap(data);
long msb = bb.getLong();
long lsb = bb.getLong();
UUID uuid = mostSignificantBitsFirst ? new UUID(msb, lsb) : new UUID(lsb, msb);
return new DeserializeResult(
uuid.toString(),
DeserializeResult.Type.STRING,
Map.of()
);
};
}
}

View file

@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
@ -217,7 +218,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {
case AVRO -> new AvroJsonSchemaConverter()
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
.toJson();
case JSON -> schema.getSchema();
case JSON ->
//need to use confluent JsonSchema since it includes resolved references
((JsonSchema) parsedSchema).rawSchema().toString();
};
}

View file

@ -2,12 +2,14 @@ package com.provectus.kafka.ui.service;
import com.google.common.collect.Streams;
import com.google.common.collect.Table;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.model.ConsumerGroupOrderingDTO;
import com.provectus.kafka.ui.model.InternalConsumerGroup;
import com.provectus.kafka.ui.model.InternalTopicConsumerGroup;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.SortOrderDTO;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import com.provectus.kafka.ui.util.SslPropertiesUtil;
import java.util.ArrayList;
import java.util.Collection;
@ -26,11 +28,8 @@ import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@ -248,25 +247,27 @@ public class ConsumerGroupService {
.flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
}
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster) {
public EnhancedConsumer createConsumer(KafkaCluster cluster) {
return createConsumer(cluster, Map.of());
}
public KafkaConsumer<Bytes, Bytes> createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
public EnhancedConsumer createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
Properties props = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-consumer-" + System.currentTimeMillis());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
props.putAll(properties);
return new KafkaConsumer<>(props);
return new EnhancedConsumer(
props,
cluster.getPollingSettings().getPollingThrottler(),
ApplicationMetrics.forCluster(cluster)
);
}
}

View file

@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
@ -39,7 +38,6 @@ import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
@Service
@Slf4j
@ -61,39 +59,22 @@ public class KafkaConnectService {
public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
@Nullable final String search) {
return getConnects(cluster)
.flatMap(connect -> getConnectorNames(cluster, connect.getName()).map(cn -> Tuples.of(connect.getName(), cn)))
.flatMap(pair -> getConnector(cluster, pair.getT1(), pair.getT2()))
.flatMap(connector ->
getConnectorConfig(cluster, connector.getConnect(), connector.getName())
.map(config -> InternalConnectInfo.builder()
.connector(connector)
.config(config)
.build()
)
)
.flatMap(connectInfo -> {
ConnectorDTO connector = connectInfo.getConnector();
return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
.collectList()
.map(tasks -> InternalConnectInfo.builder()
.connector(connector)
.config(connectInfo.getConfig())
.tasks(tasks)
.build()
);
})
.flatMap(connectInfo -> {
ConnectorDTO connector = connectInfo.getConnector();
return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
.map(ct -> InternalConnectInfo.builder()
.connector(connector)
.config(connectInfo.getConfig())
.tasks(connectInfo.getTasks())
.topics(ct.getTopics())
.build()
);
})
.map(kafkaConnectMapper::fullConnectorInfoFromTuple)
.flatMap(connect ->
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName ->
Mono.zip(
getConnector(cluster, connect.getName(), connectorName),
getConnectorConfig(cluster, connect.getName(), connectorName),
getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
getConnectorTopics(cluster, connect.getName(), connectorName)
).map(tuple ->
InternalConnectInfo.builder()
.connector(tuple.getT1())
.config(tuple.getT2())
.tasks(tuple.getT3())
.topics(tuple.getT4().getTopics())
.build())))
.map(kafkaConnectMapper::fullConnectorInfo)
.filter(matchesSearchTerm(search));
}
@ -132,6 +113,11 @@ public class KafkaConnectService {
.flatMapMany(Flux::fromIterable);
}
// returns empty flux if there was an error communicating with Connect
public Flux<String> getConnectorNamesWithErrorsSuppress(KafkaCluster cluster, String connectName) {
return getConnectorNames(cluster, connectName).onErrorComplete();
}
@SneakyThrows
private List<String> parseConnectorsNamesStringToList(String json) {
return objectMapper.readValue(json, new TypeReference<>() {

View file

@ -15,6 +15,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.util.KafkaVersion;
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -129,38 +131,41 @@ public class ReactiveAdminClient implements Closeable {
Set<SupportedFeature> features,
boolean topicDeletionIsAllowed) {
private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
return loadBrokersConfig(ac, List.of(controllerId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
.flatMap(configs -> {
String version = "1.0-UNKNOWN";
boolean topicDeletionEnabled = true;
for (ConfigEntry entry : configs) {
if (entry.name().contains("inter.broker.protocol.version")) {
version = entry.value();
}
if (entry.name().equals("delete.topic.enable")) {
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
}
}
var builder = ConfigRelatedInfo.builder()
.version(version)
.topicDeletionIsAllowed(topicDeletionEnabled);
return SupportedFeature.forVersion(ac, version)
.map(features -> builder.features(features).build());
});
static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
.flatMap(desc -> {
// choosing node from which we will get configs (starting with controller)
var targetNodeId = Optional.ofNullable(desc.controller)
.map(Node::id)
.orElse(desc.getNodes().iterator().next().id());
return loadBrokersConfig(ac, List.of(targetNodeId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
.flatMap(configs -> {
String version = "1.0-UNKNOWN";
boolean topicDeletionEnabled = true;
for (ConfigEntry entry : configs) {
if (entry.name().contains("inter.broker.protocol.version")) {
version = entry.value();
}
if (entry.name().equals("delete.topic.enable")) {
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
}
}
final String finalVersion = version;
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
return SupportedFeature.forVersion(ac, version)
.map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
});
})
.cache(UPDATE_DURATION);
}
}
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
return describeClusterImpl(adminClient, Set.of())
// choosing node from which we will get configs (starting with controller)
.flatMap(descr -> descr.controller != null
? Mono.just(descr.controller)
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
)
.flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
.map(info -> new ReactiveAdminClient(adminClient, info));
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
}
@ -170,7 +175,7 @@ public class ReactiveAdminClient implements Closeable {
.doOnError(th -> !(th instanceof SecurityDisabledException)
&& !(th instanceof InvalidRequestException)
&& !(th instanceof UnsupportedVersionException),
th -> log.warn("Error checking if security enabled", th))
th -> log.debug("Error checking if security enabled", th))
.onErrorReturn(false);
}
@ -202,6 +207,8 @@ public class ReactiveAdminClient implements Closeable {
@Getter(AccessLevel.PACKAGE) // visible for testing
private final AdminClient client;
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
private volatile ConfigRelatedInfo configRelatedInfo;
public Set<SupportedFeature> getClusterFeatures() {
@ -228,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
if (controller == null) {
return Mono.empty();
}
return ConfigRelatedInfo.extract(client, controller.id())
return configRelatedInfoMono
.doOnNext(info -> this.configRelatedInfo = info)
.then();
}

View file

@ -14,8 +14,7 @@ import com.provectus.kafka.ui.sr.model.CompatibilityLevelChange;
import com.provectus.kafka.ui.sr.model.NewSubject;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.util.ReactiveFailover;
import com.provectus.kafka.ui.util.WebClientConfigurator;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
@ -92,7 +91,7 @@ public class SchemaRegistryService {
private Mono<SubjectWithCompatibilityLevel> getSchemaSubject(KafkaCluster cluster, String schemaName,
String version) {
return api(cluster)
.mono(c -> c.getSubjectVersion(schemaName, version))
.mono(c -> c.getSubjectVersion(schemaName, version, false))
.zipWith(getSchemaCompatibilityInfoOrGlobal(cluster, schemaName))
.map(t -> new SubjectWithCompatibilityLevel(t.getT1(), t.getT2()))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.error(new SchemaNotFoundException()));
@ -126,7 +125,7 @@ public class SchemaRegistryService {
.onErrorMap(WebClientResponseException.Conflict.class,
th -> new SchemaCompatibilityException())
.onErrorMap(WebClientResponseException.UnprocessableEntity.class,
th -> new ValidationException("Invalid schema"))
th -> new ValidationException("Invalid schema. Error from registry: " + th.getResponseBodyAsString()))
.then(getLatestSchemaVersionBySubject(cluster, subject));
}

View file

@ -1,16 +1,44 @@
package com.provectus.kafka.ui.service.acl;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.resource.PatternType.LITERAL;
import static org.apache.kafka.common.resource.PatternType.PREFIXED;
import static org.apache.kafka.common.resource.ResourceType.CLUSTER;
import static org.apache.kafka.common.resource.ResourceType.GROUP;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;
import static org.apache.kafka.common.resource.ResourceType.TRANSACTIONAL_ID;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -22,11 +50,14 @@ public class AclsService {
private final AdminClientService adminClientService;
public Mono<Void> createAcl(KafkaCluster cluster, AclBinding aclBinding) {
var aclString = AclCsv.createAclString(aclBinding);
log.info("CREATING ACL: [{}]", aclString);
return adminClientService.get(cluster)
.flatMap(ac -> ac.createAcls(List.of(aclBinding)))
.doOnSuccess(v -> log.info("ACL CREATED: [{}]", aclString));
.flatMap(ac -> createAclsWithLogging(ac, List.of(aclBinding)));
}
private Mono<Void> createAclsWithLogging(ReactiveAdminClient ac, Collection<AclBinding> bindings) {
bindings.forEach(b -> log.info("CREATING ACL: [{}]", AclCsv.createAclString(b)));
return ac.createAcls(bindings)
.doOnSuccess(v -> bindings.forEach(b -> log.info("ACL CREATED: [{}]", AclCsv.createAclString(b))));
}
public Mono<Void> deleteAcl(KafkaCluster cluster, AclBinding aclBinding) {
@ -92,4 +123,150 @@ public class AclsService {
}
}
// creates allow binding for resources by prefix or specific names list
private List<AclBinding> createAllowBindings(ResourceType resourceType,
List<AclOperation> opsToAllow,
String principal,
String host,
@Nullable String resourcePrefix,
@Nullable Collection<String> resourceNames) {
List<AclBinding> bindings = new ArrayList<>();
if (resourcePrefix != null) {
for (var op : opsToAllow) {
bindings.add(
new AclBinding(
new ResourcePattern(resourceType, resourcePrefix, PREFIXED),
new AccessControlEntry(principal, host, op, ALLOW)));
}
}
if (!CollectionUtils.isEmpty(resourceNames)) {
resourceNames.stream()
.distinct()
.forEach(resource ->
opsToAllow.forEach(op ->
bindings.add(
new AclBinding(
new ResourcePattern(resourceType, resource, LITERAL),
new AccessControlEntry(principal, host, op, ALLOW)))));
}
return bindings;
}
public Mono<Void> createConsumerAcl(KafkaCluster cluster, CreateConsumerAclDTO request) {
return adminClientService.get(cluster)
.flatMap(ac -> createAclsWithLogging(ac, createConsumerBindings(request)))
.then();
}
//Read, Describe on topics, Read on consumerGroups
private List<AclBinding> createConsumerBindings(CreateConsumerAclDTO request) {
List<AclBinding> bindings = new ArrayList<>();
bindings.addAll(
createAllowBindings(TOPIC,
List.of(READ, DESCRIBE),
request.getPrincipal(),
request.getHost(),
request.getTopicsPrefix(),
request.getTopics()));
bindings.addAll(
createAllowBindings(
GROUP,
List.of(READ),
request.getPrincipal(),
request.getHost(),
request.getConsumerGroupsPrefix(),
request.getConsumerGroups()));
return bindings;
}
public Mono<Void> createProducerAcl(KafkaCluster cluster, CreateProducerAclDTO request) {
return adminClientService.get(cluster)
.flatMap(ac -> createAclsWithLogging(ac, createProducerBindings(request)))
.then();
}
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
//IDEMPOTENT_WRITE on cluster if idempotent is enabled
private List<AclBinding> createProducerBindings(CreateProducerAclDTO request) {
List<AclBinding> bindings = new ArrayList<>();
bindings.addAll(
createAllowBindings(
TOPIC,
List.of(WRITE, DESCRIBE, CREATE),
request.getPrincipal(),
request.getHost(),
request.getTopicsPrefix(),
request.getTopics()));
bindings.addAll(
createAllowBindings(
TRANSACTIONAL_ID,
List.of(WRITE, DESCRIBE),
request.getPrincipal(),
request.getHost(),
request.getTransactionsIdPrefix(),
Optional.ofNullable(request.getTransactionalId()).map(List::of).orElse(null)));
if (Boolean.TRUE.equals(request.getIdempotent())) {
bindings.addAll(
createAllowBindings(
CLUSTER,
List.of(IDEMPOTENT_WRITE),
request.getPrincipal(),
request.getHost(),
null,
List.of(Resource.CLUSTER_NAME))); // cluster name is a const string in ACL api
}
return bindings;
}
public Mono<Void> createStreamAppAcl(KafkaCluster cluster, CreateStreamAppAclDTO request) {
return adminClientService.get(cluster)
.flatMap(ac -> createAclsWithLogging(ac, createStreamAppBindings(request)))
.then();
}
// Read on input topics, Write on output topics
// ALL on applicationId-prefixed Groups and Topics
private List<AclBinding> createStreamAppBindings(CreateStreamAppAclDTO request) {
List<AclBinding> bindings = new ArrayList<>();
bindings.addAll(
createAllowBindings(
TOPIC,
List.of(READ),
request.getPrincipal(),
request.getHost(),
null,
request.getInputTopics()));
bindings.addAll(
createAllowBindings(
TOPIC,
List.of(WRITE),
request.getPrincipal(),
request.getHost(),
null,
request.getOutputTopics()));
bindings.addAll(
createAllowBindings(
GROUP,
List.of(ALL),
request.getPrincipal(),
request.getHost(),
request.getApplicationId(),
null));
bindings.addAll(
createAllowBindings(
TOPIC,
List.of(ALL),
request.getPrincipal(),
request.getHost(),
request.getApplicationId(),
null));
return bindings;
}
}

View file

@ -1,9 +1,9 @@
package com.provectus.kafka.ui.service.analyze;
import com.provectus.kafka.ui.emitter.EmptyPollsCounter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.OffsetsInfo;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.PollingThrottler;
import com.provectus.kafka.ui.exception.TopicAnalysisException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.TopicAnalysisDTO;
@ -20,11 +20,9 @@ import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Bytes;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -84,12 +82,11 @@ public class TopicAnalysisService {
private final int partitionsCnt;
private final long approxNumberOfMsgs;
private final EmptyPollsCounter emptyPollsCounter;
private final PollingThrottler throttler;
private final TopicAnalysisStats totalStats = new TopicAnalysisStats();
private final Map<Integer, TopicAnalysisStats> partitionStats = new HashMap<>();
private final KafkaConsumer<Bytes, Bytes> consumer;
private final EnhancedConsumer consumer;
AnalysisTask(KafkaCluster cluster, TopicIdentity topicId, int partitionsCnt,
long approxNumberOfMsgs, PollingSettings pollingSettings) {
@ -104,7 +101,6 @@ public class TopicAnalysisService {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100000"
)
);
this.throttler = pollingSettings.getPollingThrottler();
this.emptyPollsCounter = pollingSettings.createEmptyPollsCounter();
}
@ -127,9 +123,8 @@ public class TopicAnalysisService {
var offsetsInfo = new OffsetsInfo(consumer, topicId.topicName);
while (!offsetsInfo.assignedPartitionsFullyPolled() && !emptyPollsCounter.noDataEmptyPollsReached()) {
var polled = consumer.poll(Duration.ofSeconds(3));
throttler.throttleAfterPoll(polled);
emptyPollsCounter.count(polled);
var polled = consumer.pollEnhanced(Duration.ofSeconds(3));
emptyPollsCounter.count(polled.count());
polled.forEach(r -> {
totalStats.apply(r);
partitionStats.get(r.partition()).apply(r);

View file

@ -13,6 +13,7 @@ import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -37,6 +38,7 @@ import reactor.core.publisher.Signal;
public class AuditService implements Closeable {
private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
@ -56,14 +58,8 @@ public class AuditService implements Closeable {
public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
Map<String, AuditWriter> auditWriters = new HashMap<>();
for (var cluster : clustersStorage.getKafkaClusters()) {
ReactiveAdminClient adminClient;
try {
adminClient = adminClientService.get(cluster).block();
} catch (Exception e) {
printAuditInitError(cluster, "Error connect to cluster", e);
continue;
}
createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block(BLOCK_TIMEOUT);
createAuditWriter(cluster, adminClientSupplier, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
.ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
}
this.auditWriters = auditWriters;
@ -76,7 +72,7 @@ public class AuditService implements Closeable {
@VisibleForTesting
static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
ReactiveAdminClient ac,
Supplier<ReactiveAdminClient> acSupplier,
Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
var auditProps = cluster.getOriginalProperties().getAudit();
if (auditProps == null) {
@ -87,32 +83,54 @@ public class AuditService implements Closeable {
if (!topicAudit && !consoleAudit) {
return Optional.empty();
}
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
@Nullable KafkaProducer<byte[], byte[]> producer = null;
if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
producer = producerFactory.get();
if (!topicAudit) {
log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
return Optional.of(consoleOnlyWriter(cluster));
}
log.info("Audit service initialized for cluster '{}'", cluster.getName());
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
if (!topicAuditCanBeDone) {
if (consoleAudit) {
log.info(
"Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
cluster.getName()
);
return Optional.of(consoleOnlyWriter(cluster));
}
return Optional.empty();
}
log.info("Audit initialization finished for cluster '{}'", cluster.getName());
return Optional.of(
new AuditWriter(
cluster.getName(),
auditTopicName,
producer,
producerFactory.get(),
consoleAudit ? AUDIT_LOGGER : null
)
);
}
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
}
/**
* return true if topic created/existing and producing can be enabled.
*/
private static boolean createTopicIfNeeded(KafkaCluster cluster,
ReactiveAdminClient ac,
Supplier<ReactiveAdminClient> acSupplier,
String auditTopicName,
ClustersProperties.AuditProperties auditProps) {
ReactiveAdminClient ac;
try {
ac = acSupplier.get();
} catch (Exception e) {
printAuditInitError(cluster, "Error while connecting to the cluster", e);
return false;
}
boolean topicExists;
try {
topicExists = ac.listTopics(true).block().contains(auditTopicName);
topicExists = ac.listTopics(true).block(BLOCK_TIMEOUT).contains(auditTopicName);
} catch (Exception e) {
printAuditInitError(cluster, "Error checking audit topic existence", e);
return false;
@ -130,7 +148,7 @@ public class AuditService implements Closeable {
.ifPresent(topicConfig::putAll);
log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block(BLOCK_TIMEOUT);
log.info("Audit topic created for cluster '{}'", cluster.getName());
return true;
} catch (Exception e) {
@ -142,7 +160,7 @@ public class AuditService implements Closeable {
private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
log.error("-----------------------------------------------------------------");
log.error(
"Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
"Error initializing Audit for cluster '{}'. Audit will be disabled. See error below: ",
cluster.getName()
);
log.error("{}", errorMsg, cause);

View file

@ -18,7 +18,7 @@ import org.slf4j.Logger;
@Slf4j
record AuditWriter(String clusterName,
String targetTopic,
@Nullable String targetTopic,
@Nullable KafkaProducer<byte[], byte[]> producer,
@Nullable Logger consoleLogger) implements Closeable {
@ -43,7 +43,7 @@ record AuditWriter(String clusterName,
if (consoleLogger != null) {
consoleLogger.info(json);
}
if (producer != null) {
if (targetTopic != null && producer != null) {
producer.send(
new ProducerRecord<>(targetTopic, null, json.getBytes(UTF_8)),
(metadata, ex) -> {

View file

@ -25,7 +25,7 @@ class ConnectorsExporter {
Flux<DataEntityList> export(KafkaCluster cluster) {
return kafkaConnectService.getConnects(cluster)
.flatMap(connect -> kafkaConnectService.getConnectorNames(cluster, connect.getName())
.flatMap(connect -> kafkaConnectService.getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
.flatMap(connectorName -> kafkaConnectService.getConnector(cluster, connect.getName(), connectorName))
.flatMap(connectorDTO ->
kafkaConnectService.getConnectorTopics(cluster, connect.getName(), connectorDTO.getName())

View file

@ -5,13 +5,10 @@ import com.google.common.base.Preconditions;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.KafkaConnectService;
import com.provectus.kafka.ui.service.StatisticsCache;
import java.util.List;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import lombok.SneakyThrows;
import org.opendatadiscovery.client.ApiClient;
import org.opendatadiscovery.client.api.OpenDataDiscoveryIngestionApi;
import org.opendatadiscovery.client.model.DataEntity;
import org.opendatadiscovery.client.model.DataEntityList;
import org.opendatadiscovery.client.model.DataSource;
import org.opendatadiscovery.client.model.DataSourceList;
@ -68,14 +65,14 @@ class OddExporter {
private Mono<Void> exportTopics(KafkaCluster c) {
return createKafkaDataSource(c)
.thenMany(topicsExporter.export(c))
.concatMap(this::sentDataEntities)
.concatMap(this::sendDataEntities)
.then();
}
private Mono<Void> exportKafkaConnects(KafkaCluster cluster) {
return createConnectDataSources(cluster)
.thenMany(connectorsExporter.export(cluster))
.concatMap(this::sentDataEntities)
.concatMap(this::sendDataEntities)
.then();
}
@ -99,7 +96,7 @@ class OddExporter {
);
}
private Mono<Void> sentDataEntities(DataEntityList dataEntityList) {
private Mono<Void> sendDataEntities(DataEntityList dataEntityList) {
return oddApi.postDataEntityList(dataEntityList);
}

View file

@ -0,0 +1,55 @@
package com.provectus.kafka.ui.service.integration.odd;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
// logic copied from AbstractSchemaProvider:resolveReferences
// https://github.com/confluentinc/schema-registry/blob/fd59613e2c5adf62e36705307f420712e4c8c1ea/client/src/main/java/io/confluent/kafka/schemaregistry/AbstractSchemaProvider.java#L54
class SchemaReferencesResolver {
private final KafkaSrClientApi client;
SchemaReferencesResolver(KafkaSrClientApi client) {
this.client = client;
}
Mono<ImmutableMap<String, String>> resolve(List<SchemaReference> refs) {
return resolveReferences(refs, new Resolving(ImmutableMap.of(), ImmutableSet.of()))
.map(Resolving::resolved);
}
private record Resolving(ImmutableMap<String, String> resolved, ImmutableSet<String> visited) {
Resolving visit(String name) {
return new Resolving(resolved, ImmutableSet.<String>builder().addAll(visited).add(name).build());
}
Resolving resolve(String ref, String schema) {
return new Resolving(ImmutableMap.<String, String>builder().putAll(resolved).put(ref, schema).build(), visited);
}
}
private Mono<Resolving> resolveReferences(@Nullable List<SchemaReference> refs, Resolving initState) {
Mono<Resolving> result = Mono.just(initState);
for (SchemaReference reference : Optional.ofNullable(refs).orElse(List.of())) {
result = result.flatMap(state -> {
if (state.visited().contains(reference.getName())) {
return Mono.just(state);
} else {
final var newState = state.visit(reference.getName());
return client.getSubjectVersion(reference.getSubject(), String.valueOf(reference.getVersion()), true)
.flatMap(subj ->
resolveReferences(subj.getReferences(), newState)
.map(withNewRefs -> withNewRefs.resolve(reference.getName(), subj.getSchema())));
}
});
}
return result;
}
}

View file

@ -5,6 +5,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.Statistics;
import com.provectus.kafka.ui.service.StatisticsCache;
import com.provectus.kafka.ui.service.integration.odd.schema.DataSetFieldsExtractors;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.net.URI;
import java.util.List;
import java.util.Map;
@ -24,6 +25,8 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
@Slf4j
@RequiredArgsConstructor
@ -38,6 +41,8 @@ class TopicsExporter {
return Flux.fromIterable(stats.getTopicDescriptions().keySet())
.filter(topicFilter)
.flatMap(topic -> createTopicDataEntity(cluster, topic, stats))
.onErrorContinue(
(th, topic) -> log.warn("Error exporting data for topic {}, cluster {}", topic, cluster.getName(), th))
.buffer(100)
.map(topicsEntities ->
new DataEntityList()
@ -89,23 +94,29 @@ class TopicsExporter {
.build();
}
//returns empty list if schemaRegistry is not configured or assumed subject not found
private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
String topic,
KafkaPath topicOddrn,
//currently we only retrieve value schema
boolean isKey) {
if (cluster.getSchemaRegistryClient() == null) {
return Mono.just(List.of());
}
String subject = topic + (isKey ? "-key" : "-value");
return cluster.getSchemaRegistryClient()
.mono(client -> client.getSubjectVersion(subject, "latest"))
.map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
return getSubjWithResolvedRefs(cluster, subject)
.map(t -> DataSetFieldsExtractors.extract(t.getT1(), t.getT2(), topicOddrn, isKey))
.onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
.onErrorResume(th -> true, th -> {
log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th);
return Mono.just(List.of());
});
.onErrorMap(WebClientResponseException.class, err ->
new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
}
private Mono<Tuple2<SchemaSubject, Map<String, String>>> getSubjWithResolvedRefs(KafkaCluster cluster,
String subjectName) {
return cluster.getSchemaRegistryClient()
.mono(client ->
client.getSubjectVersion(subjectName, "latest", false)
.flatMap(subj -> new SchemaReferencesResolver(client).resolve(subj.getReferences())
.map(resolvedRefs -> Tuples.of(subj, resolvedRefs))));
}
}

View file

@ -1,7 +1,7 @@
package com.provectus.kafka.ui.service.integration.odd.schema;
import com.google.common.collect.ImmutableSet;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
@ -14,8 +14,8 @@ final class AvroExtractor {
private AvroExtractor() {
}
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
var schema = new Schema.Parser().parse(subject.getSchema());
static List<DataSetField> extract(AvroSchema avroSchema, KafkaPath topicOddrn, boolean isKey) {
var schema = avroSchema.rawSchema();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract(

View file

@ -2,7 +2,11 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import com.provectus.kafka.ui.sr.model.SchemaType;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.opendatadiscovery.client.model.DataSetField;
import org.opendatadiscovery.client.model.DataSetFieldType;
@ -10,12 +14,18 @@ import org.opendatadiscovery.oddrn.model.KafkaPath;
public final class DataSetFieldsExtractors {
public static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
public static List<DataSetField> extract(SchemaSubject subject,
Map<String, String> resolvedRefs,
KafkaPath topicOddrn,
boolean isKey) {
SchemaType schemaType = Optional.ofNullable(subject.getSchemaType()).orElse(SchemaType.AVRO);
return switch (schemaType) {
case AVRO -> AvroExtractor.extract(subject, topicOddrn, isKey);
case JSON -> JsonSchemaExtractor.extract(subject, topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(subject, topicOddrn, isKey);
case AVRO -> AvroExtractor.extract(
new AvroSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case JSON -> JsonSchemaExtractor.extract(
new JsonSchema(subject.getSchema(), List.of(), resolvedRefs, null), topicOddrn, isKey);
case PROTOBUF -> ProtoExtractor.extract(
new ProtobufSchema(subject.getSchema(), List.of(), resolvedRefs, null, null), topicOddrn, isKey);
};
}

View file

@ -30,8 +30,8 @@ final class JsonSchemaExtractor {
private JsonSchemaExtractor() {
}
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
Schema schema = new JsonSchema(subject.getSchema()).rawSchema();
static List<DataSetField> extract(JsonSchema jsonSchema, KafkaPath topicOddrn, boolean isKey) {
Schema schema = jsonSchema.rawSchema();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
extract(

View file

@ -15,7 +15,6 @@ import com.google.protobuf.Timestamp;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
import com.google.protobuf.Value;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.util.ArrayList;
import java.util.List;
@ -42,8 +41,8 @@ final class ProtoExtractor {
private ProtoExtractor() {
}
static List<DataSetField> extract(SchemaSubject subject, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = new ProtobufSchema(subject.getSchema()).toDescriptor();
static List<DataSetField> extract(ProtobufSchema protobufSchema, KafkaPath topicOddrn, boolean isKey) {
Descriptor schema = protobufSchema.toDescriptor();
List<DataSetField> result = new ArrayList<>();
result.add(DataSetFieldsExtractors.rootField(topicOddrn, isKey));
var rootOddrn = topicOddrn.oddrn() + "/columns/" + (isKey ? "key" : "value");

View file

@ -51,6 +51,8 @@ import reactor.core.publisher.Mono;
@Slf4j
public class AccessControlService {
private static final String ACCESS_DENIED = "Access denied";
@Nullable
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
private final RoleBasedAccessControlProperties properties;
@ -97,6 +99,17 @@ public class AccessControlService {
return Mono.empty();
}
if (CollectionUtils.isNotEmpty(context.getApplicationConfigActions())) {
return getUser()
.doOnNext(user -> {
boolean accessGranted = isApplicationConfigAccessible(context, user);
if (!accessGranted) {
throw new AccessDeniedException(ACCESS_DENIED);
}
}).then();
}
return getUser()
.doOnNext(user -> {
boolean accessGranted =
@ -113,7 +126,7 @@ public class AccessControlService {
&& isAuditAccessible(context, user);
if (!accessGranted) {
throw new AccessDeniedException("Access denied");
throw new AccessDeniedException(ACCESS_DENIED);
}
})
.then();

View file

@ -0,0 +1,78 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.provectus.kafka.ui.config.auth.LdapProperties;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.ldap.core.DirContextOperations;
import org.springframework.ldap.core.support.BaseLdapPathContextSource;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.ldap.userdetails.DefaultLdapAuthoritiesPopulator;
import org.springframework.util.Assert;
@Slf4j
public class RbacLdapAuthoritiesExtractor extends DefaultLdapAuthoritiesPopulator {
private final AccessControlService acs;
private final LdapProperties props;
public RbacLdapAuthoritiesExtractor(ApplicationContext context,
BaseLdapPathContextSource contextSource, String groupFilterSearchBase) {
super(contextSource, groupFilterSearchBase);
this.acs = context.getBean(AccessControlService.class);
this.props = context.getBean(LdapProperties.class);
}
@Override
protected Set<GrantedAuthority> getAdditionalRoles(DirContextOperations user, String username) {
var ldapGroups = getRoles(user.getNameInNamespace(), username);
return acs.getRoles()
.stream()
.filter(r -> r.getSubjects()
.stream()
.filter(subject -> subject.getProvider().equals(Provider.LDAP))
.filter(subject -> subject.getType().equals("group"))
.anyMatch(subject -> ldapGroups.contains(subject.getValue()))
)
.map(Role::getName)
.peek(role -> log.trace("Mapped role [{}] for user [{}]", role, username))
.map(SimpleGrantedAuthority::new)
.collect(Collectors.toSet());
}
private Set<String> getRoles(String userDn, String username) {
var groupSearchBase = props.getGroupFilterSearchBase();
Assert.notNull(groupSearchBase, "groupSearchBase is empty");
var groupRoleAttribute = props.getGroupRoleAttribute();
if (groupRoleAttribute == null) {
groupRoleAttribute = "cn";
}
log.trace(
"Searching for roles for user [{}] with DN [{}], groupRoleAttribute [{}] and filter [{}] in search base [{}]",
username, userDn, groupRoleAttribute, getGroupSearchFilter(), groupSearchBase);
var ldapTemplate = getLdapTemplate();
ldapTemplate.setIgnoreNameNotFoundException(true);
Set<Map<String, List<String>>> userRoles = ldapTemplate.searchForMultipleAttributeValues(
groupSearchBase, getGroupSearchFilter(), new String[] {userDn, username},
new String[] {groupRoleAttribute});
return userRoles.stream()
.map(record -> record.get(getGroupRoleAttribute()).get(0))
.peek(group -> log.trace("Found LDAP group [{}] for user [{}]", group, username))
.collect(Collectors.toSet());
}
}

View file

@ -0,0 +1,82 @@
package com.provectus.kafka.ui.util;
import static lombok.AccessLevel.PRIVATE;
import com.google.common.annotations.VisibleForTesting;
import com.provectus.kafka.ui.emitter.PolledRecords;
import com.provectus.kafka.ui.model.KafkaCluster;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor(access = PRIVATE)
public class ApplicationMetrics {
private final String clusterName;
private final MeterRegistry registry;
public static ApplicationMetrics forCluster(KafkaCluster cluster) {
return new ApplicationMetrics(cluster.getName(), Metrics.globalRegistry);
}
@VisibleForTesting
public static ApplicationMetrics noop() {
return new ApplicationMetrics("noop", new SimpleMeterRegistry());
}
public void meterPolledRecords(String topic, PolledRecords polled, boolean throttled) {
pollTimer(topic).record(polled.elapsed());
polledRecords(topic).increment(polled.count());
polledBytes(topic).record(polled.bytes());
if (throttled) {
pollThrottlingActivations().increment();
}
}
private Counter polledRecords(String topic) {
return Counter.builder("topic_records_polled")
.description("Number of records polled from topic")
.tag("cluster", clusterName)
.tag("topic", topic)
.register(registry);
}
private DistributionSummary polledBytes(String topic) {
return DistributionSummary.builder("topic_polled_bytes")
.description("Bytes polled from kafka topic")
.tag("cluster", clusterName)
.tag("topic", topic)
.register(registry);
}
private Timer pollTimer(String topic) {
return Timer.builder("topic_poll_time")
.description("Time spend in polling for topic")
.tag("cluster", clusterName)
.tag("topic", topic)
.register(registry);
}
private Counter pollThrottlingActivations() {
return Counter.builder("poll_throttling_activations")
.description("Number of poll throttling activations")
.tag("cluster", clusterName)
.register(registry);
}
public AtomicInteger activeConsumers() {
var count = new AtomicInteger();
Gauge.builder("active_consumers", () -> count)
.description("Number of active consumers")
.tag("cluster", clusterName)
.register(registry);
return count;
}
}

View file

@ -1,29 +0,0 @@
package com.provectus.kafka.ui.util;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;
public class ConsumerRecordsUtil {
public static int calculatePolledRecSize(ConsumerRecord<Bytes, Bytes> rec) {
int polledBytes = 0;
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
return polledBytes;
}
public static int calculatePolledSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
polledBytes += calculatePolledRecSize(rec);
}
return polledBytes;
}
}

View file

@ -28,6 +28,9 @@ import reactor.netty.http.client.HttpClient;
public class WebClientConfigurator {
private final WebClient.Builder builder = WebClient.builder();
private HttpClient httpClient = HttpClient
.create()
.proxyWithSystemProperties();
public WebClientConfigurator() {
configureObjectMapper(defaultOM());
@ -90,12 +93,7 @@ public class WebClientConfigurator {
// Create webclient
SslContext context = contextBuilder.build();
var httpClient = HttpClient
.create()
.secure(t -> t.sslContext(context))
.proxyWithSystemProperties();
builder.clientConnector(new ReactorClientHttpConnector(httpClient));
httpClient = httpClient.secure(t -> t.sslContext(context));
return this;
}
@ -131,6 +129,6 @@ public class WebClientConfigurator {
}
public WebClient build() {
return builder.build();
return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build();
}
}

View file

@ -10,7 +10,7 @@ management:
endpoints:
web:
exposure:
include: "info,health"
include: "info,health,prometheus"
logging:
level:

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.container.KafkaConnectContainer;
import com.provectus.kafka.ui.container.KsqlDbContainer;
import com.provectus.kafka.ui.container.SchemaRegistryContainer;
import java.nio.file.Path;
import java.util.List;
@ -32,7 +33,7 @@ public abstract class AbstractIntegrationTest {
public static final String LOCAL = "local";
public static final String SECOND_LOCAL = "secondLocal";
private static final String CONFLUENT_PLATFORM_VERSION = "5.5.0";
private static final String CONFLUENT_PLATFORM_VERSION = "7.2.1"; // Append ".arm64" for a local run
public static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka").withTag(CONFLUENT_PLATFORM_VERSION))
@ -49,6 +50,11 @@ public abstract class AbstractIntegrationTest {
.dependsOn(kafka)
.dependsOn(schemaRegistry);
protected static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
DockerImageName.parse("confluentinc/cp-ksqldb-server")
.withTag(CONFLUENT_PLATFORM_VERSION))
.withKafka(kafka);
@TempDir
public static Path tmpDir;
@ -71,6 +77,8 @@ public abstract class AbstractIntegrationTest {
System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.address", kafkaConnect.getTarget());
System.setProperty("kafka.clusters.0.kafkaConnect.1.name", "notavailable");
System.setProperty("kafka.clusters.0.kafkaConnect.1.address", "http://notavailable:6666");
System.setProperty("kafka.clusters.0.masking.0.type", "REPLACE");
System.setProperty("kafka.clusters.0.masking.0.replacement", "***");
System.setProperty("kafka.clusters.0.masking.0.topicValuesPattern", "masking-test-.*");

View file

@ -2,6 +2,7 @@ package com.provectus.kafka.ui;
import com.provectus.kafka.ui.model.CompatibilityLevelDTO;
import com.provectus.kafka.ui.model.NewSchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaReferenceDTO;
import com.provectus.kafka.ui.model.SchemaSubjectDTO;
import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
import com.provectus.kafka.ui.model.SchemaTypeDTO;
@ -190,6 +191,58 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
Assertions.assertEquals(schema, actual.getSchema());
}
@Test
void shouldCreateNewProtobufSchemaWithRefs() {
NewSchemaSubjectDTO requestBody = new NewSchemaSubjectDTO()
.schemaType(SchemaTypeDTO.PROTOBUF)
.subject(subject + "-ref")
.schema("""
syntax = "proto3";
message MyRecord {
int32 id = 1;
string name = 2;
}
""");
webTestClient
.post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
.exchange()
.expectStatus()
.isOk();
requestBody = new NewSchemaSubjectDTO()
.schemaType(SchemaTypeDTO.PROTOBUF)
.subject(subject)
.schema("""
syntax = "proto3";
import "MyRecord.proto";
message MyRecordWithRef {
int32 id = 1;
MyRecord my_ref = 2;
}
""")
.references(List.of(new SchemaReferenceDTO().name("MyRecord.proto").subject(subject + "-ref").version(1)));
SchemaSubjectDTO actual = webTestClient
.post()
.uri("/api/clusters/{clusterName}/schemas", LOCAL)
.contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromPublisher(Mono.just(requestBody), NewSchemaSubjectDTO.class))
.exchange()
.expectStatus()
.isOk()
.expectBody(SchemaSubjectDTO.class)
.returnResult()
.getResponseBody();
Assertions.assertNotNull(actual);
Assertions.assertEquals(requestBody.getReferences(), actual.getReferences());
}
@Test
public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() {
webTestClient
@ -278,7 +331,7 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest {
void shouldCreateNewSchemaWhenSubjectIncludesNonAsciiCharacters() {
String schema =
"{\"subject\":\"test/test\",\"schemaType\":\"JSON\",\"schema\":"
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";
+ "\"{\\\"type\\\": \\\"string\\\"}\"}";
webTestClient
.post()

View file

@ -0,0 +1,80 @@
package com.provectus.kafka.ui.serdes.builtin;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.serde.api.DeserializeResult;
import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
import com.provectus.kafka.ui.serdes.RecordHeadersImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
public class HexSerdeTest {
private static final byte[] TEST_BYTES = "hello world".getBytes();
private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64";
private HexSerde hexSerde;
@BeforeEach
void init() {
hexSerde = new HexSerde();
hexSerde.autoConfigure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
}
@ParameterizedTest
@CsvSource({
"68656C6C6F20776F726C64", // uppercase
"68656c6c6f20776f726c64", // lowercase
"68:65:6c:6c:6f:20:77:6f:72:6c:64", // ':' delim
"68 65 6C 6C 6F 20 77 6F 72 6C 64", // space delim, UC
"68 65 6c 6c 6f 20 77 6f 72 6c 64", // space delim, LC
"#68 #65 #6C #6C #6F #20 #77 #6F #72 #6C #64" // '#' prefix, space delim
})
void serializesInputAsHexString(String hexString) {
for (Serde.Target type : Serde.Target.values()) {
var serializer = hexSerde.serializer("anyTopic", type);
byte[] bytes = serializer.serialize(hexString);
assertThat(bytes).isEqualTo(TEST_BYTES);
}
}
@ParameterizedTest
@EnumSource
void serializesEmptyStringAsEmptyBytesArray(Serde.Target type) {
var serializer = hexSerde.serializer("anyTopic", type);
byte[] bytes = serializer.serialize("");
assertThat(bytes).isEqualTo(new byte[] {});
}
@ParameterizedTest
@EnumSource
void deserializesDataAsHexBytes(Serde.Target type) {
var deserializer = hexSerde.deserializer("anyTopic", type);
var result = deserializer.deserialize(new RecordHeadersImpl(), TEST_BYTES);
assertThat(result.getResult()).isEqualTo(TEST_BYTES_HEX_ENCODED);
assertThat(result.getType()).isEqualTo(DeserializeResult.Type.STRING);
assertThat(result.getAdditionalProperties()).isEmpty();
}
@ParameterizedTest
@EnumSource
void getSchemaReturnsEmpty(Serde.Target type) {
assertThat(hexSerde.getSchema("anyTopic", type)).isEmpty();
}
@ParameterizedTest
@EnumSource
void canDeserializeReturnsTrueForAllInputs(Serde.Target type) {
assertThat(hexSerde.canDeserialize("anyTopic", type)).isTrue();
}
@ParameterizedTest
@EnumSource
void canSerializeReturnsTrueForAllInput(Serde.Target type) {
assertThat(hexSerde.canSerialize("anyTopic", type)).isTrue();
}
}

View file

@ -8,9 +8,11 @@ import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.emitter.BackwardRecordEmitter;
import com.provectus.kafka.ui.emitter.EnhancedConsumer;
import com.provectus.kafka.ui.emitter.ForwardRecordEmitter;
import com.provectus.kafka.ui.emitter.MessagesProcessing;
import com.provectus.kafka.ui.emitter.PollingSettings;
import com.provectus.kafka.ui.emitter.PollingThrottler;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
@ -18,6 +20,7 @@ import com.provectus.kafka.ui.serde.api.Serde;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import com.provectus.kafka.ui.serdes.PropertyResolverImpl;
import com.provectus.kafka.ui.serdes.builtin.StringSerde;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@ -38,7 +41,6 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@ -325,22 +327,20 @@ class RecordEmitterTest extends AbstractIntegrationTest {
assertionsConsumer.accept(step.expectComplete().verifyThenAssertThat());
}
private KafkaConsumer<Bytes, Bytes> createConsumer() {
private EnhancedConsumer createConsumer() {
return createConsumer(Map.of());
}
private KafkaConsumer<Bytes, Bytes> createConsumer(Map<String, Object> properties) {
private EnhancedConsumer createConsumer(Map<String, Object> properties) {
final Map<String, ? extends Serializable> map = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19, // to check multiple polls
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 19 // to check multiple polls
);
Properties props = new Properties();
props.putAll(map);
props.putAll(properties);
return new KafkaConsumer<>(props);
return new EnhancedConsumer(props, PollingThrottler.noop(), ApplicationMetrics.noop());
}
@Value

View file

@ -4,16 +4,21 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import com.provectus.kafka.ui.service.ReactiveAdminClient;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
@ -53,12 +58,12 @@ class AclsServiceTest {
when(adminClientMock.listAcls(ResourcePatternFilter.ANY))
.thenReturn(Mono.just(List.of(existingBinding1, existingBinding2)));
ArgumentCaptor<?> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls((Collection<AclBinding>) createdCaptor.capture()))
ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls(createdCaptor.capture()))
.thenReturn(Mono.empty());
ArgumentCaptor<?> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.deleteAcls((Collection<AclBinding>) deletedCaptor.capture()))
ArgumentCaptor<Collection<AclBinding>> deletedCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.deleteAcls(deletedCaptor.capture()))
.thenReturn(Mono.empty());
aclsService.syncAclWithAclCsv(
@ -68,15 +73,218 @@ class AclsServiceTest {
+ "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
).block();
Collection<AclBinding> createdBindings = (Collection<AclBinding>) createdCaptor.getValue();
Collection<AclBinding> createdBindings = createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(1)
.contains(newBindingToBeAdded);
Collection<AclBinding> deletedBindings = (Collection<AclBinding>) deletedCaptor.getValue();
Collection<AclBinding> deletedBindings = deletedCaptor.getValue();
assertThat(deletedBindings)
.hasSize(1)
.contains(existingBinding2);
}
@Test
void createsConsumerDependantAcls() {
ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls(createdCaptor.capture()))
.thenReturn(Mono.empty());
var principal = UUID.randomUUID().toString();
var host = UUID.randomUUID().toString();
aclsService.createConsumerAcl(
CLUSTER,
new CreateConsumerAclDTO()
.principal(principal)
.host(host)
.consumerGroups(List.of("cg1", "cg2"))
.topics(List.of("t1", "t2"))
).block();
//Read, Describe on topics, Read on consumerGroups
Collection<AclBinding> createdBindings = createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(6)
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "cg1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "cg2", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
}
@Test
void createsConsumerDependantAclsWhenTopicsAndGroupsSpecifiedByPrefix() {
ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls(createdCaptor.capture()))
.thenReturn(Mono.empty());
var principal = UUID.randomUUID().toString();
var host = UUID.randomUUID().toString();
aclsService.createConsumerAcl(
CLUSTER,
new CreateConsumerAclDTO()
.principal(principal)
.host(host)
.consumerGroupsPrefix("cgPref")
.topicsPrefix("topicPref")
).block();
//Read, Describe on topics, Read on consumerGroups
Collection<AclBinding> createdBindings = createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(3)
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "cgPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)));
}
@Test
void createsProducerDependantAcls() {
ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls(createdCaptor.capture()))
.thenReturn(Mono.empty());
var principal = UUID.randomUUID().toString();
var host = UUID.randomUUID().toString();
aclsService.createProducerAcl(
CLUSTER,
new CreateProducerAclDTO()
.principal(principal)
.host(host)
.topics(List.of("t1"))
.idempotent(true)
.transactionalId("txId1")
).block();
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
//IDEMPOTENT_WRITE on cluster if idempotent is enabled (true)
Collection<AclBinding> createdBindings = createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(6)
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txId1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.IDEMPOTENT_WRITE, AclPermissionType.ALLOW)));
}
@Test
void createsProducerDependantAclsWhenTopicsAndTxIdSpecifiedByPrefix() {
ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls(createdCaptor.capture()))
.thenReturn(Mono.empty());
var principal = UUID.randomUUID().toString();
var host = UUID.randomUUID().toString();
aclsService.createProducerAcl(
CLUSTER,
new CreateProducerAclDTO()
.principal(principal)
.host(host)
.topicsPrefix("topicPref")
.transactionsIdPrefix("txIdPref")
.idempotent(false)
).block();
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
//IDEMPOTENT_WRITE on cluster if idempotent is enabled (false)
Collection<AclBinding> createdBindings = createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(5)
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "topicPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.CREATE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TRANSACTIONAL_ID, "txIdPref", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.DESCRIBE, AclPermissionType.ALLOW)));
}
@Test
void createsStreamAppDependantAcls() {
ArgumentCaptor<Collection<AclBinding>> createdCaptor = ArgumentCaptor.forClass(Collection.class);
when(adminClientMock.createAcls(createdCaptor.capture()))
.thenReturn(Mono.empty());
var principal = UUID.randomUUID().toString();
var host = UUID.randomUUID().toString();
aclsService.createStreamAppAcl(
CLUSTER,
new CreateStreamAppAclDTO()
.principal(principal)
.host(host)
.inputTopics(List.of("t1"))
.outputTopics(List.of("t2", "t3"))
.applicationId("appId1")
).block();
// Read on input topics, Write on output topics
// ALL on applicationId-prefixed Groups and Topics
Collection<AclBinding> createdBindings = createdCaptor.getValue();
assertThat(createdBindings)
.hasSize(5)
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t1", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.READ, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t2", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "t3", PatternType.LITERAL),
new AccessControlEntry(principal, host, AclOperation.WRITE, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "appId1", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)))
.contains(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "appId1", PatternType.PREFIXED),
new AccessControlEntry(principal, host, AclOperation.ALL, AclPermissionType.ALLOW)));
}
}

View file

@ -81,7 +81,7 @@ class AuditServiceTest {
@Test
void noWriterIfNoAuditPropsSet() {
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isEmpty();
}
@ -91,7 +91,7 @@ class AuditServiceTest {
auditProps.setConsoleAuditEnabled(true);
clustersProperties.setAudit(auditProps);
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isPresent();
var writer = maybeWriter.get();
@ -116,7 +116,7 @@ class AuditServiceTest {
when(adminClientMock.listTopics(true))
.thenReturn(Mono.just(Set.of("test_audit_topic")));
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isPresent();
//checking there was no topic creation request
@ -136,7 +136,7 @@ class AuditServiceTest {
when(adminClientMock.createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap()))
.thenReturn(Mono.empty());
var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
assertThat(maybeWriter).isPresent();
//verifying topic created

View file

@ -61,7 +61,7 @@ class ConnectorsExporterTest {
when(kafkaConnectService.getConnects(CLUSTER))
.thenReturn(Flux.just(connect));
when(kafkaConnectService.getConnectorNames(CLUSTER, connect.getName()))
when(kafkaConnectService.getConnectorNamesWithErrorsSuppress(CLUSTER, connect.getName()))
.thenReturn(Flux.just(sinkConnector.getName(), sourceConnector.getName()));
when(kafkaConnectService.getConnector(CLUSTER, connect.getName(), sinkConnector.getName()))

View file

@ -0,0 +1,86 @@
package com.provectus.kafka.ui.service.integration.odd;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.provectus.kafka.ui.sr.api.KafkaSrClientApi;
import com.provectus.kafka.ui.sr.model.SchemaReference;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
class SchemaReferencesResolverTest {
private final KafkaSrClientApi srClientMock = mock(KafkaSrClientApi.class);
private final SchemaReferencesResolver schemaReferencesResolver = new SchemaReferencesResolver(srClientMock);
@Test
void resolvesRefsUsingSrClient() {
mockSrCall("sub1", 1,
new SchemaSubject()
.schema("schema1"));
mockSrCall("sub2", 1,
new SchemaSubject()
.schema("schema2")
.references(
List.of(
new SchemaReference().name("ref2_1").subject("sub2_1").version(2),
new SchemaReference().name("ref2_2").subject("sub1").version(1))));
mockSrCall("sub2_1", 2,
new SchemaSubject()
.schema("schema2_1")
.references(
List.of(
new SchemaReference().name("ref2_1_1").subject("sub2_1_1").version(3),
new SchemaReference().name("ref1").subject("should_not_be_called").version(1)
))
);
mockSrCall("sub2_1_1", 3,
new SchemaSubject()
.schema("schema2_1_1"));
var resolvedRefsMono = schemaReferencesResolver.resolve(
List.of(
new SchemaReference().name("ref1").subject("sub1").version(1),
new SchemaReference().name("ref2").subject("sub2").version(1)));
StepVerifier.create(resolvedRefsMono)
.assertNext(refs ->
assertThat(refs)
.containsExactlyEntriesOf(
// checking map should be ordered
ImmutableMap.<String, String>builder()
.put("ref1", "schema1")
.put("ref2_1_1", "schema2_1_1")
.put("ref2_1", "schema2_1")
.put("ref2_2", "schema1")
.put("ref2", "schema2")
.build()))
.verifyComplete();
}
@Test
void returnsEmptyMapOnEmptyInputs() {
StepVerifier.create(schemaReferencesResolver.resolve(null))
.assertNext(map -> assertThat(map).isEmpty())
.verifyComplete();
StepVerifier.create(schemaReferencesResolver.resolve(List.of()))
.assertNext(map -> assertThat(map).isEmpty())
.verifyComplete();
}
private void mockSrCall(String subject, int version, SchemaSubject subjectToReturn) {
when(srClientMock.getSubjectVersion(subject, version + "", true))
.thenReturn(Mono.just(subjectToReturn));
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.service.integration.odd;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -22,6 +23,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opendatadiscovery.client.model.DataEntity;
import org.opendatadiscovery.client.model.DataEntityType;
import org.springframework.http.HttpHeaders;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -52,9 +55,8 @@ class TopicsExporterTest {
@Test
void doesNotExportTopicsWhichDontFitFiltrationRule() {
when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString()))
.thenReturn(Mono.error(new RuntimeException("Not found")));
when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString(), anyBoolean()))
.thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null)));
stats = Statistics.empty()
.toBuilder()
.topicDescriptions(
@ -83,14 +85,14 @@ class TopicsExporterTest {
@Test
void doesExportTopicData() {
when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest"))
when(schemaRegistryClientMock.getSubjectVersion("testTopic-value", "latest", false))
.thenReturn(Mono.just(
new SchemaSubject()
.schema("\"string\"")
.schemaType(SchemaType.AVRO)
));
when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest"))
when(schemaRegistryClientMock.getSubjectVersion("testTopic-key", "latest", false))
.thenReturn(Mono.just(
new SchemaSubject()
.schema("\"int\"")

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opendatadiscovery.client.model.DataSetField;
@ -15,8 +15,7 @@ class AvroExtractorTest {
@ValueSource(booleans = {true, false})
void test(boolean isKey) {
var list = AvroExtractor.extract(
new SchemaSubject()
.schema("""
new AvroSchema("""
{
"type": "record",
"name": "Message",

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import java.net.URI;
import java.util.List;
import java.util.Map;
@ -40,7 +40,7 @@ class JsonSchemaExtractorTest {
}
""";
var fields = JsonSchemaExtractor.extract(
new SchemaSubject().schema(jsonSchema),
new JsonSchema(jsonSchema),
KafkaPath.builder()
.cluster("localhost:9092")
.topic("someTopic")

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.integration.odd.schema;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.sr.model.SchemaSubject;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.opendatadiscovery.client.model.DataSetField;
@ -54,8 +54,7 @@ class ProtoExtractorTest {
}""";
var list = ProtoExtractor.extract(
new SchemaSubject()
.schema(protoSchema),
new ProtobufSchema(protoSchema),
KafkaPath.builder()
.cluster("localhost:9092")
.topic("someTopic")

View file

@ -3,28 +3,22 @@ package com.provectus.kafka.ui.service.ksql;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.DecimalNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.TextNode;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.container.KsqlDbContainer;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import reactor.test.StepVerifier;
class KsqlApiClientTest extends AbstractIntegrationTest {
private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
.withKafka(kafka);
@BeforeAll
static void startContainer() {
KSQL_DB.start();
@ -72,7 +66,7 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
private void assertLastKsqTutorialQueryResult(KsqlApiClient client) {
// expected results:
//{"header":"Schema","columnNames":[...],"values":null}
//{"header":"Row","columnNames":null,"values":[[0.0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
//{"header":"Row","columnNames":null,"values":[[0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
//{"header":"Row","columnNames":null,"values":[[10.0,["18f4ea86"],1]]}
StepVerifier.create(
client.execute(
@ -86,34 +80,26 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
assertThat(header.getValues()).isNull();
})
.assertNext(row -> {
assertThat(row).isEqualTo(
KsqlApiClient.KsqlResponseTable.builder()
.header("Row")
.columnNames(null)
.values(List.of(List.of(
new DoubleNode(0.0),
new ArrayNode(JsonNodeFactory.instance)
.add(new TextNode("4ab5cbad"))
.add(new TextNode("8b6eae59"))
.add(new TextNode("4a7c7b41")),
new IntNode(3)
)))
.build()
);
var distance = (DecimalNode) row.getValues().get(0).get(0);
var riders = (ArrayNode) row.getValues().get(0).get(1);
var count = (IntNode) row.getValues().get(0).get(2);
assertThat(distance).isEqualTo(new DecimalNode(new BigDecimal(0)));
assertThat(riders).isEqualTo(new ArrayNode(JsonNodeFactory.instance)
.add(new TextNode("4ab5cbad"))
.add(new TextNode("8b6eae59"))
.add(new TextNode("4a7c7b41")));
assertThat(count).isEqualTo(new IntNode(3));
})
.assertNext(row -> {
assertThat(row).isEqualTo(
KsqlApiClient.KsqlResponseTable.builder()
.header("Row")
.columnNames(null)
.values(List.of(List.of(
new DoubleNode(10.0),
new ArrayNode(JsonNodeFactory.instance)
.add(new TextNode("18f4ea86")),
new IntNode(1)
)))
.build()
);
var distance = (DecimalNode) row.getValues().get(0).get(0);
var riders = (ArrayNode) row.getValues().get(0).get(1);
var count = (IntNode) row.getValues().get(0).get(2);
assertThat(distance).isEqualTo(new DecimalNode(new BigDecimal(10)));
assertThat(riders).isEqualTo(new ArrayNode(JsonNodeFactory.instance)
.add(new TextNode("18f4ea86")));
assertThat(count).isEqualTo(new IntNode(1));
})
.verifyComplete();
}

View file

@ -3,7 +3,6 @@ package com.provectus.kafka.ui.service.ksql;
import static org.assertj.core.api.Assertions.assertThat;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.container.KsqlDbContainer;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
@ -15,14 +14,9 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.utility.DockerImageName;
class KsqlServiceV2Test extends AbstractIntegrationTest {
private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
.withKafka(kafka);
private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();

View file

@ -77,6 +77,10 @@ paths:
required: true
schema:
type: string
- name: deleted
in: query
schema:
type: boolean
responses:
200:
description: OK
@ -317,6 +321,10 @@ components:
type: string
schemaType:
$ref: '#/components/schemas/SchemaType'
references:
type: array
items:
$ref: '#/components/schemas/SchemaReference'
required:
- id
- subject

View file

@ -1868,6 +1868,69 @@ paths:
404:
description: Acl not found
/api/clusters/{clusterName}/acl/consumer:
post:
tags:
- Acls
summary: createConsumerAcl
operationId: createConsumerAcl
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CreateConsumerAcl'
responses:
200:
description: OK
/api/clusters/{clusterName}/acl/producer:
post:
tags:
- Acls
summary: createProducerAcl
operationId: createProducerAcl
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CreateProducerAcl'
responses:
200:
description: OK
/api/clusters/{clusterName}/acl/streamApp:
post:
tags:
- Acls
summary: createStreamAppAcl
operationId: createStreamAppAcl
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/CreateStreamAppAcl'
responses:
200:
description: OK
/api/authorization:
get:
tags:
@ -2916,6 +2979,10 @@ components:
type: string
schemaType:
$ref: '#/components/schemas/SchemaType'
references:
type: array
items:
$ref: '#/components/schemas/SchemaReference'
required:
- id
- subject
@ -2933,13 +3000,30 @@ components:
schema:
type: string
schemaType:
$ref: '#/components/schemas/SchemaType'
# upon updating a schema, the type of existing schema can't be changed
$ref: '#/components/schemas/SchemaType' # upon updating a schema, the type of existing schema can't be changed
references:
type: array
items:
$ref: '#/components/schemas/SchemaReference'
required:
- subject
- schema
- schemaType
SchemaReference:
type: object
properties:
name:
type: string
subject:
type: string
version:
type: integer
required:
- name
- subject
- version
CompatibilityLevel:
type: object
properties:
@ -3530,7 +3614,7 @@ components:
principal:
type: string
host:
type: string # "*" if acl can be applied to any resource of given type
type: string
operation:
type: string
enum:
@ -3554,6 +3638,69 @@ components:
- ALLOW
- DENY
CreateConsumerAcl:
type: object
required: [principal, host]
properties:
principal:
type: string
host:
type: string
topics:
type: array
items:
type: string
topicsPrefix:
type: string
consumerGroups:
type: array
items:
type: string
consumerGroupsPrefix:
type: string
CreateProducerAcl:
type: object
required: [principal, host]
properties:
principal:
type: string
host:
type: string
topics:
type: array
items:
type: string
topicsPrefix:
type: string
transactionalId:
type: string
transactionsIdPrefix:
type: string
idempotent:
type: boolean
default: false
CreateStreamAppAcl:
type: object
required: [principal, host, applicationId, inputTopics, outputTopics]
properties:
principal:
type: string
host:
type: string
inputTopics:
type: array
items:
type: string
outputTopics:
type: array
items:
type: string
applicationId:
nullable: false
type: string
KafkaAclResourceType:
type: string
enum:

View file

@ -19,12 +19,12 @@
<selenium.version>4.8.1</selenium.version>
<selenide.version>6.12.3</selenide.version>
<testng.version>7.7.1</testng.version>
<allure.version>2.21.0</allure.version>
<qase.io.version>3.0.4</qase.io.version>
<allure.version>2.22.2</allure.version>
<qase.io.version>3.0.5</qase.io.version>
<aspectj.version>1.9.9.1</aspectj.version>
<assertj.version>3.24.2</assertj.version>
<hamcrest.version>2.2</hamcrest.version>
<slf4j.version>2.0.5</slf4j.version>
<slf4j.version>2.0.7</slf4j.version>
<kafka.version>3.3.1</kafka.version>
</properties>

View file

@ -0,0 +1,15 @@
package com.provectus.kafka.ui.utilities;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class StringUtils {
public static String getMixedCase(String original) {
return IntStream.range(0, original.length())
.mapToObj(i -> i % 2 == 0 ? Character.toUpperCase(original.charAt(i)) : original.charAt(i))
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
}

View file

@ -1,6 +1,5 @@
package com.provectus.kafka.ui.manualsuite.backlog;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.BROKERS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.SCHEMAS_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_PROFILE_SUITE_ID;
import static com.provectus.kafka.ui.qasesuite.BaseQaseTest.TOPICS_SUITE_ID;
@ -57,24 +56,17 @@ public class SmokeBacklog extends BaseManualTest {
public void testCaseF() {
}
@Automation(state = TO_BE_AUTOMATED)
@Suite(id = BROKERS_SUITE_ID)
@QaseId(348)
@Test
public void testCaseG() {
}
@Automation(state = NOT_AUTOMATED)
@Suite(id = TOPICS_SUITE_ID)
@QaseId(50)
@Test
public void testCaseH() {
public void testCaseG() {
}
@Automation(state = NOT_AUTOMATED)
@Suite(id = SCHEMAS_SUITE_ID)
@QaseId(351)
@Test
public void testCaseI() {
public void testCaseH() {
}
}

Some files were not shown because too many files have changed in this diff Show more