Compare commits

...
Sign in to create a new pull request.

6 commits

Author SHA1 Message Date
Nail Badiullin
b0c367cac7 FE: Fix HTTP 400 for clusters with special symbols (#3591)
(cherry picked from commit ecc8db1948)
2023-04-14 00:12:47 +08:00
Nail Badiullin
d10996ed60 [FE] Messages filtering by offset & timestamp doesn't work (#3582)
(cherry picked from commit 83f9432569)
2023-04-14 00:12:11 +08:00
Roman Zabaluev
8e781e5d80 BE: Exempt appconfig from rbac check (#3647)
(cherry picked from commit b3240d9057)
2023-04-14 00:12:03 +08:00
Ilya Kuramshin
183317c781 BE: Fix loading freezes in case one of the brokers is down (#3618)
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
(cherry picked from commit dbdced5bab)
2023-04-14 00:11:53 +08:00
Roman Zabaluev
fcd88240ba FE: Fix config param source nullability (#3661)
(cherry picked from commit c148f112a4)
2023-04-14 00:11:36 +08:00
Ilya Kuramshin
b991fb3560 ISSUE-3144: CVE fixes, Springboot upd (#3624)
* ISSUE-3144: Spring boot version bump to 3.0.5, snakeyaml upd
* explicit spring security dependency removed
* openapi plugin updated to 6.5
* Some javax.annotation imports migrated to jakarta.annotation
* base container sha specified
* Update CognitoAuthorityExtractor

(cherry picked from commit ee1cd72dd5)
2023-04-14 00:09:36 +08:00
25 changed files with 388 additions and 159 deletions

View file

@ -1,4 +1,5 @@
FROM azul/zulu-openjdk-alpine:17-jre
#FROM azul/zulu-openjdk-alpine:17-jre-headless
FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
RUN apk add --no-cache gcompat # need to make snappy codec work
RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui

View file

@ -6,7 +6,13 @@ import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.connect.ApiClient;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.Connector;
import com.provectus.kafka.ui.connect.model.ConnectorPlugin;
import com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse;
import com.provectus.kafka.ui.connect.model.ConnectorStatus;
import com.provectus.kafka.ui.connect.model.ConnectorTask;
import com.provectus.kafka.ui.connect.model.ConnectorTopics;
import com.provectus.kafka.ui.connect.model.NewConnector;
import com.provectus.kafka.ui.connect.model.TaskStatus;
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.util.WebClientConfigurator;
@ -15,11 +21,7 @@ import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.util.MultiValueMap;
import org.springframework.http.ResponseEntity;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestClientException;
import org.springframework.web.reactive.function.client.WebClient;
@ -79,6 +81,176 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
);
}
@Override
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
throws WebClientResponseException {
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
}
@Override
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnector(connectorName));
}
@Override
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
}
@Override
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnector(connectorName));
}
@Override
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
}
@Override
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfig(connectorName));
}
@Override
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
}
@Override
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPlugins());
}
@Override
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
}
@Override
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatus(connectorName));
}
@Override
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
}
@Override
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
}
@Override
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
}
@Override
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasks(connectorName));
}
@Override
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
}
@Override
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopics(connectorName));
}
@Override
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
}
@Override
public Flux<String> getConnectors(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectors(search));
}
@Override
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
}
@Override
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnector(connectorName));
}
@Override
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
}
@Override
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
}
@Override
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
Boolean onlyFailed) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
}
@Override
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
}
@Override
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
throws WebClientResponseException {
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
}
@Override
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
return super.resumeConnector(connectorName);
}
@Override
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
throws WebClientResponseException {
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
}
@Override
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
}
@Override
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
Map<String, Object> requestBody)
throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
}
@Override
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
}
private static class RetryingApiClient extends ApiClient {
public RetryingApiClient(ConnectCluster config,
@ -108,35 +280,5 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
.configureBufferSize(maxBuffSize)
.build();
}
@Override
public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
MultiValueMap<String, String> queryParams, Object body,
HttpHeaders headerParams,
MultiValueMap<String, String> cookieParams,
MultiValueMap<String, Object> formParams, List<MediaType> accept,
MediaType contentType, String[] authNames,
ParameterizedTypeReference<T> returnType)
throws RestClientException {
return withRetryOnConflict(
super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
formParams, accept, contentType, authNames, returnType)
);
}
@Override
public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
MultiValueMap<String, String> queryParams, Object body,
HttpHeaders headerParams,
MultiValueMap<String, String> cookieParams,
MultiValueMap<String, Object> formParams,
List<MediaType> accept, MediaType contentType,
String[] authNames, ParameterizedTypeReference<T> returnType)
throws RestClientException {
return withRetryOnConflict(
super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
cookieParams, formParams, accept, contentType, authNames, returnType)
);
}
}
}

View file

@ -1,6 +1,7 @@
package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.model.MetricsConfig;
import jakarta.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -8,7 +9,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

View file

@ -1,9 +1,9 @@
package com.provectus.kafka.ui.config.auth;
import jakarta.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.PostConstruct;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.Assert;

View file

@ -149,10 +149,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
}
@Override
public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
String connectName,
public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
String connectorName,
@Valid Mono<Object> requestBody,
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
@ -164,8 +163,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
return validateAccess.then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok)
);
.map(ResponseEntity::ok));
}
@Override
@ -242,7 +240,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
@Override
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>> validateConnectorPluginConfig(
String clusterName, String connectName, String pluginName, @Valid Mono<Object> requestBody,
String clusterName, String connectName, String pluginName, @Valid Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {
return kafkaConnectService
.validateConnectorPluginConfig(

View file

@ -134,7 +134,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
.timestamp(currentTimestamp())
.stackTrace(Throwables.getStackTraceAsString(exception));
return ServerResponse
.status(exception.getStatus())
.status(exception.getStatusCode())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(response);
}

View file

@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac;
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
@ -25,6 +26,8 @@ import org.springframework.util.Assert;
@EqualsAndHashCode
public class Permission {
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST = List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG);
Resource resource;
List<String> actions;
@ -50,7 +53,7 @@ public class Permission {
public void validate() {
Assert.notNull(resource, "resource cannot be null");
if (!List.of(KSQL, CLUSTERCONFIG).contains(this.resource)) {
if (!RBAC_ACTION_EXEMPT_LIST.contains(this.resource)) {
Assert.notNull(value, "permission value can't be empty for resource " + resource);
}
}

View file

@ -1,38 +1,58 @@
package com.provectus.kafka.ui.service;
import static java.util.regex.Pattern.CASE_INSENSITIVE;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.endpoint.Sanitizer;
import org.springframework.stereotype.Component;
@Component
class KafkaConfigSanitizer extends Sanitizer {
private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = Arrays.asList(
"basic.auth.user.info", /* For Schema Registry credentials */
"password", "secret", "token", "key", ".*credentials.*", /* General credential patterns */
"aws.access.*", "aws.secret.*", "aws.session.*" /* AWS-related credential patterns */
);
class KafkaConfigSanitizer {
private static final String SANITIZED_VALUE = "******";
private static final String[] REGEX_PARTS = {"*", "$", "^", "+"};
private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = ImmutableList.<String>builder()
.addAll(kafkaConfigKeysToSanitize())
.add(
"basic.auth.user.info", /* For Schema Registry credentials */
"password", "secret", "token", "key", ".*credentials.*", /* General credential patterns */
"aws.access.*", "aws.secret.*", "aws.session.*" /* AWS-related credential patterns */
)
.build();
private final List<Pattern> sanitizeKeysPatterns;
KafkaConfigSanitizer(
@Value("${kafka.config.sanitizer.enabled:true}") boolean enabled,
@Value("${kafka.config.sanitizer.patterns:}") List<String> patternsToSanitize
) {
if (!enabled) {
setKeysToSanitize();
} else {
var keysToSanitize = new HashSet<>(
patternsToSanitize.isEmpty() ? DEFAULT_PATTERNS_TO_SANITIZE : patternsToSanitize);
keysToSanitize.addAll(kafkaConfigKeysToSanitize());
setKeysToSanitize(keysToSanitize.toArray(new String[] {}));
}
this.sanitizeKeysPatterns = enabled
? compile(patternsToSanitize.isEmpty() ? DEFAULT_PATTERNS_TO_SANITIZE : patternsToSanitize)
: List.of();
}
private static List<Pattern> compile(Collection<String> patternStrings) {
return patternStrings.stream()
.map(p -> isRegex(p)
? Pattern.compile(p, CASE_INSENSITIVE)
: Pattern.compile(".*" + p + "$", CASE_INSENSITIVE))
.toList();
}
private static boolean isRegex(String str) {
return Arrays.stream(REGEX_PARTS).anyMatch(str::contains);
}
private static Set<String> kafkaConfigKeysToSanitize() {
@ -45,4 +65,17 @@ class KafkaConfigSanitizer extends Sanitizer {
.collect(Collectors.toSet());
}
public Object sanitize(String key, Object value) {
if (value == null) {
return null;
}
for (Pattern pattern : sanitizeKeysPatterns) {
if (pattern.matcher(key).matches()) {
return SANITIZED_VALUE;
}
}
return value;
}
}

View file

@ -225,11 +225,11 @@ public class KafkaConnectService {
}
public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
String connectorName, Mono<Object> requestBody) {
String connectorName, Mono<Map<String, Object>> requestBody) {
return api(cluster, connectName)
.mono(c ->
requestBody
.flatMap(body -> c.setConnectorConfig(connectorName, (Map<String, Object>) body))
.flatMap(body -> c.setConnectorConfig(connectorName, body))
.map(kafkaConnectMapper::fromClient));
}
@ -298,12 +298,12 @@ public class KafkaConnectService {
}
public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(
KafkaCluster cluster, String connectName, String pluginName, Mono<Object> requestBody) {
KafkaCluster cluster, String connectName, String pluginName, Mono<Map<String, Object>> requestBody) {
return api(cluster, connectName)
.mono(client ->
requestBody
.flatMap(body ->
client.validateConnectorPluginConfig(pluginName, (Map<String, Object>) body))
client.validateConnectorPluginConfig(pluginName, body))
.map(kafkaConnectMapper::fromClient)
);
}

View file

@ -4,6 +4,7 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
@ -498,6 +499,14 @@ public class ReactiveAdminClient implements Closeable {
.flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
}
/**
* List offset for the specified topics, skipping no-leader partitions.
*/
public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
OffsetSpec offsetSpec) {
return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
}
private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
boolean failOnUnknownLeader) {
var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
@ -507,34 +516,44 @@ public class ReactiveAdminClient implements Closeable {
descriptions.values(), partitions::contains, failOnUnknownLeader));
}
private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
@VisibleForTesting
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
Predicate<TopicPartition> partitionPredicate,
boolean failOnUnknownLeader) {
var goodPartitions = new HashSet<TopicPartition>();
for (TopicDescription description : topicDescriptions) {
var goodTopicPartitions = new ArrayList<TopicPartition>();
for (TopicPartitionInfo partitionInfo : description.partitions()) {
TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
if (!partitionPredicate.test(topicPartition)) {
continue;
if (partitionInfo.leader() == null) {
if (failOnUnknownLeader) {
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
} else {
// if ANY of topic partitions has no leader - we have to skip all topic partitions
goodTopicPartitions.clear();
break;
}
}
if (partitionInfo.leader() != null) {
goodPartitions.add(topicPartition);
} else if (failOnUnknownLeader) {
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
if (partitionPredicate.test(topicPartition)) {
goodTopicPartitions.add(topicPartition);
}
}
goodPartitions.addAll(goodTopicPartitions);
}
return goodPartitions;
}
// 1. NOTE(!): should only apply for partitions with existing leader,
// 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
// otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
// 2. NOTE(!): Skips partitions that were not initialized yet
// (UnknownTopicOrPartitionException thrown, ex. after topic creation)
// 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
@KafkaClientInternalsDependant
public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
OffsetSpec offsetSpec) {
@VisibleForTesting
Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
if (partitions.isEmpty()) {
return Mono.just(Map.of());
}
Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
parts -> {

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import com.google.common.collect.Sets;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.exception.TopicMetadataException;
import com.provectus.kafka.ui.exception.TopicNotFoundException;
@ -136,22 +137,14 @@ public class TopicsService {
}
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
descriptions,
descriptionsMap,
ReactiveAdminClient ac) {
var topicPartitions = descriptions.values().stream()
.flatMap(desc ->
desc.partitions().stream()
// list offsets should only be applied to partitions with existing leader
// (see ReactiveAdminClient.listOffsetsUnsafe(..) docs)
.filter(tp -> tp.leader() != null)
.map(p -> new TopicPartition(desc.name(), p.partition())))
.collect(toList());
return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest())
.zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()),
var descriptions = descriptionsMap.values();
return ac.listOffsets(descriptions, OffsetSpec.earliest())
.zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()),
(earliest, latest) ->
topicPartitions.stream()
.filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
Sets.intersection(earliest.keySet(), latest.keySet())
.stream()
.map(tp ->
Map.entry(tp,
new InternalPartitionsOffsets.Offsets(

View file

@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.analyze;
import com.provectus.kafka.ui.model.TopicAnalysisSizeStatsDTO;
import com.provectus.kafka.ui.model.TopicAnalysisStatsDTO;
import com.provectus.kafka.ui.model.TopicAnalysisStatsHourlyMsgCountsDTO;
import com.provectus.kafka.ui.model.TopicAnalysisStatsHourlyMsgCountsInnerDTO;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
@ -78,10 +78,10 @@ class TopicAnalysisStats {
}
}
List<TopicAnalysisStatsHourlyMsgCountsDTO> toDto() {
List<TopicAnalysisStatsHourlyMsgCountsInnerDTO> toDto() {
return hourlyStats.entrySet().stream()
.sorted(Comparator.comparingLong(Map.Entry::getKey))
.map(e -> new TopicAnalysisStatsHourlyMsgCountsDTO()
.map(e -> new TopicAnalysisStatsHourlyMsgCountsInnerDTO()
.hourStart(e.getKey())
.count(e.getValue()))
.collect(Collectors.toList());

View file

@ -21,6 +21,7 @@ import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.LdapAuthorityExtractor;
import com.provectus.kafka.ui.service.rbac.extractor.ProviderAuthorityExtractor;
import jakarta.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@ -28,7 +29,6 @@ import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;

View file

@ -1,9 +1,9 @@
package com.provectus.kafka.ui.service.rbac.extractor;
import com.nimbusds.jose.shaded.json.JSONArray;
import com.provectus.kafka.ui.model.rbac.Role;
import com.provectus.kafka.ui.model.rbac.provider.Provider;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -44,7 +44,7 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
.map(Role::getName)
.collect(Collectors.toSet());
JSONArray groups = principal.getAttribute(COGNITO_GROUPS_ATTRIBUTE_NAME);
List<String> groups = principal.getAttribute(COGNITO_GROUPS_ATTRIBUTE_NAME);
if (groups == null) {
log.debug("Cognito groups param is not present");
return Mono.just(groupsByUsername);
@ -56,9 +56,8 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
.stream()
.filter(s -> s.getProvider().equals(Provider.OAUTH_COGNITO))
.filter(s -> s.getType().equals("group"))
.anyMatch(subject -> Stream.of(groups.toArray())
.anyMatch(subject -> Stream.of(groups)
.map(Object::toString)
.distinct()
.anyMatch(cognitoGroup -> cognitoGroup.equals(subject.getValue()))
))
.map(Role::getName)

View file

@ -16,7 +16,7 @@ import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.util.SocketUtils;
import org.springframework.test.util.TestSocketUtils;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
@ -61,7 +61,7 @@ public abstract class AbstractIntegrationTest {
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
// List unavailable hosts to verify failover
System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
SocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
TestSocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");

View file

@ -5,13 +5,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.endpoint.Sanitizer;
class KafkaConfigSanitizerTest {
@Test
void doNothingIfEnabledPropertySetToFalse() {
final Sanitizer sanitizer = new KafkaConfigSanitizer(false, Collections.emptyList());
final var sanitizer = new KafkaConfigSanitizer(false, Collections.emptyList());
assertThat(sanitizer.sanitize("password", "secret")).isEqualTo("secret");
assertThat(sanitizer.sanitize("sasl.jaas.config", "secret")).isEqualTo("secret");
assertThat(sanitizer.sanitize("database.password", "secret")).isEqualTo("secret");
@ -19,7 +18,7 @@ class KafkaConfigSanitizerTest {
@Test
void obfuscateCredentials() {
final Sanitizer sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
final var sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
assertThat(sanitizer.sanitize("sasl.jaas.config", "secret")).isEqualTo("******");
assertThat(sanitizer.sanitize("consumer.sasl.jaas.config", "secret")).isEqualTo("******");
assertThat(sanitizer.sanitize("producer.sasl.jaas.config", "secret")).isEqualTo("******");
@ -37,7 +36,7 @@ class KafkaConfigSanitizerTest {
@Test
void notObfuscateNormalConfigs() {
final Sanitizer sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
final var sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
assertThat(sanitizer.sanitize("security.protocol", "SASL_SSL")).isEqualTo("SASL_SSL");
final String[] bootstrapServer = new String[] {"test1:9092", "test2:9092"};
assertThat(sanitizer.sanitize("bootstrap.servers", bootstrapServer)).isEqualTo(bootstrapServer);
@ -45,7 +44,7 @@ class KafkaConfigSanitizerTest {
@Test
void obfuscateCredentialsWithDefinedPatterns() {
final Sanitizer sanitizer = new KafkaConfigSanitizer(true, Arrays.asList("kafka.ui", ".*test.*"));
final var sanitizer = new KafkaConfigSanitizer(true, Arrays.asList("kafka.ui", ".*test.*"));
assertThat(sanitizer.sanitize("consumer.kafka.ui", "secret")).isEqualTo("******");
assertThat(sanitizer.sanitize("this.is.test.credentials", "secret")).isEqualTo("******");
assertThat(sanitizer.sanitize("this.is.not.credential", "not.credential"))

View file

@ -4,8 +4,11 @@ import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExcep
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import com.provectus.kafka.ui.AbstractIntegrationTest;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.producer.KafkaTestProducer;
import java.time.Duration;
import java.util.ArrayList;
@ -22,16 +25,20 @@ import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.ThrowableAssert;
import org.junit.function.ThrowingRunnable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -133,6 +140,56 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
.verifyComplete();
}
@Test
void filterPartitionsWithLeaderCheckSkipsPartitionsFromTopicWhereSomePartitionsHaveNoLeader() {
var filteredPartitions = ReactiveAdminClient.filterPartitionsWithLeaderCheck(
List.of(
// contains partitions with no leader
new TopicDescription("noLeaderTopic", false,
List.of(
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
new TopicPartitionInfo(1, null, List.of(), List.of()))),
// should be skipped by predicate
new TopicDescription("skippingByPredicate", false,
List.of(
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))),
// good topic
new TopicDescription("good", false,
List.of(
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
new TopicPartitionInfo(1, new Node(2, "n2", 9092), List.of(), List.of()))
)),
p -> !p.topic().equals("skippingByPredicate"),
false
);
assertThat(filteredPartitions)
.containsExactlyInAnyOrder(
new TopicPartition("good", 0),
new TopicPartition("good", 1)
);
}
@Test
void filterPartitionsWithLeaderCheckThrowExceptionIfThereIsSomePartitionsWithoutLeaderAndFlagSet() {
ThrowingCallable call = () -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(
List.of(
// contains partitions with no leader
new TopicDescription("t1", false,
List.of(
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
new TopicPartitionInfo(1, null, List.of(), List.of()))),
new TopicDescription("t2", false,
List.of(
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))
)),
p -> true,
// setting failOnNoLeader flag
true
);
assertThatThrownBy(call).isInstanceOf(ValidationException.class);
}
@Test
void testListOffsetsUnsafe() {
String topic = UUID.randomUUID().toString();

View file

@ -15,7 +15,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.util.unit.DataSize;
import org.testcontainers.utility.DockerImageName;
class KsqlServiceV2Test extends AbstractIntegrationTest {
@ -27,8 +26,6 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
private static final DataSize maxBuffSize = DataSize.ofMegabytes(20);
@BeforeAll
static void init() {
KSQL_DB.start();

View file

@ -27,20 +27,24 @@
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
<version>${swagger-annotations.version}</version>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-integration-jakarta</artifactId>
<version>2.2.8</version>
</dependency>
<dependency>
<groupId>org.openapitools</groupId>
<artifactId>jackson-databind-nullable</artifactId>
<version>${jackson-databind-nullable.version}</version>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
<scope>provided</scope>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
@ -71,6 +75,7 @@
<library>webclient</library>
<useBeanValidation>true</useBeanValidation>
<dateLibrary>java8</dateLibrary>
<useJakartaEe>true</useJakartaEe>
</configOptions>
</configuration>
</execution>
@ -80,8 +85,7 @@
<goal>generate</goal>
</goals>
<configuration>
<inputSpec>${project.basedir}/src/main/resources/swagger/kafka-ui-api.yaml
</inputSpec>
<inputSpec>${project.basedir}/src/main/resources/swagger/kafka-ui-api.yaml</inputSpec>
<output>${project.build.directory}/generated-sources/api</output>
<generatorName>spring</generatorName>
<modelNameSuffix>DTO</modelNameSuffix>
@ -89,14 +93,12 @@
<modelPackage>com.provectus.kafka.ui.model</modelPackage>
<apiPackage>com.provectus.kafka.ui.api</apiPackage>
<sourceFolder>kafka-ui-contract</sourceFolder>
<reactive>true</reactive>
<interfaceOnly>true</interfaceOnly>
<skipDefaultInterface>true</skipDefaultInterface>
<useBeanValidation>true</useBeanValidation>
<useTags>true</useTags>
<useSpringBoot3>true</useSpringBoot3>
<dateLibrary>java8</dateLibrary>
</configOptions>
<typeMappings>
@ -116,15 +118,13 @@
<generatorName>java</generatorName>
<generateApiTests>false</generateApiTests>
<generateModelTests>false</generateModelTests>
<configOptions>
<modelPackage>com.provectus.kafka.ui.connect.model</modelPackage>
<apiPackage>com.provectus.kafka.ui.connect.api</apiPackage>
<sourceFolder>kafka-connect-client</sourceFolder>
<asyncNative>true</asyncNative>
<library>webclient</library>
<useJakartaEe>true</useJakartaEe>
<useBeanValidation>true</useBeanValidation>
<dateLibrary>java8</dateLibrary>
</configOptions>
@ -142,15 +142,13 @@
<generatorName>java</generatorName>
<generateApiTests>false</generateApiTests>
<generateModelTests>false</generateModelTests>
<configOptions>
<modelPackage>com.provectus.kafka.ui.sr.model</modelPackage>
<apiPackage>com.provectus.kafka.ui.sr.api</apiPackage>
<sourceFolder>kafka-sr-client</sourceFolder>
<asyncNative>true</asyncNative>
<library>webclient</library>
<useJakartaEe>true</useJakartaEe>
<useBeanValidation>true</useBeanValidation>
<dateLibrary>java8</dateLibrary>
</configOptions>

View file

@ -2387,6 +2387,10 @@ components:
- UNKNOWN
ConsumerGroup:
discriminator:
propertyName: inherit
mapping:
details: "#/components/schemas/ConsumerGroupDetails"
type: object
properties:
groupId:

View file

@ -231,9 +231,13 @@ const Filters: React.FC<FiltersProps> = ({
props.seekType = SeekType.TIMESTAMP;
}
const isSeekTypeWithSeekTo =
props.seekType === SeekType.TIMESTAMP ||
props.seekType === SeekType.OFFSET;
if (
selectedPartitions.length !== partitions.length ||
currentSeekType === SeekType.TIMESTAMP
isSeekTypeWithSeekTo
) {
// not everything in the partition is selected
props.seekTo = selectedPartitions.map(({ value }) => {
@ -323,7 +327,9 @@ const Filters: React.FC<FiltersProps> = ({
// eslint-disable-next-line consistent-return
React.useEffect(() => {
if (location.search?.length !== 0) {
const url = `${BASE_PARAMS.basePath}/api/clusters/${clusterName}/topics/${topicName}/messages${location.search}`;
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
clusterName
)}/topics/${topicName}/messages${location.search}`;
const sse = new EventSource(url);
source.current = sse;

View file

@ -49,7 +49,7 @@ const CustomParamField: React.FC<Props> = ({
label: option,
disabled:
(config &&
config[option].source !== ConfigSource.DYNAMIC_TOPIC_CONFIG) ||
config[option]?.source !== ConfigSource.DYNAMIC_TOPIC_CONFIG) ||
existingFields.includes(option),
}));

View file

@ -90,7 +90,9 @@ export const useKsqlkDbSSE = ({ clusterName, pipeId }: UseKsqlkDbSSEProps) => {
React.useEffect(() => {
const fetchData = async () => {
const url = `${BASE_PARAMS.basePath}/api/clusters/${clusterName}/ksql/response`;
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
clusterName
)}/ksql/response`;
await fetchEventSource(
`${url}?${new URLSearchParams({ pipeId: pipeId || '' }).toString()}`,
{

View file

@ -51,7 +51,9 @@ export const useTopicMessages = ({
React.useEffect(() => {
const fetchData = async () => {
setIsFetching(true);
const url = `${BASE_PARAMS.basePath}/api/clusters/${clusterName}/topics/${topicName}/messages`;
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
clusterName
)}/topics/${topicName}/messages`;
const requestParams = new URLSearchParams({
limit,
seekTo: seekTo.replaceAll('-', '::').replaceAll('.', ','),

30
pom.xml
View file

@ -30,17 +30,13 @@
<datasketches-java.version>3.1.0</datasketches-java.version>
<groovy.version>3.0.13</groovy.version>
<jackson.version>2.14.0</jackson.version>
<jackson-databind-nullable.version>0.2.4</jackson-databind-nullable.version>
<kafka-clients.version>3.3.1</kafka-clients.version>
<netty.version>4.1.85.Final</netty.version>
<org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
<org.projectlombok.version>1.18.24</org.projectlombok.version>
<protobuf-java.version>3.21.9</protobuf-java.version>
<reactor-netty.version>1.1.0</reactor-netty.version>
<scala-lang.library.version>2.13.9</scala-lang.library.version>
<snakeyaml.version>1.33</snakeyaml.version>
<spring-boot.version>2.7.5</spring-boot.version>
<spring-security.version>5.7.5</spring-security.version>
<snakeyaml.version>2.0</snakeyaml.version>
<spring-boot.version>3.0.5</spring-boot.version>
<kafka-ui-serde-api.version>1.0.0</kafka-ui-serde-api.version>
<odd-oddrn-generator.version>0.1.15</odd-oddrn-generator.version>
<odd-oddrn-client.version>0.1.23</odd-oddrn-client.version>
@ -62,9 +58,8 @@
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<openapi-generator-maven-plugin.version>4.3.0</openapi-generator-maven-plugin.version>
<openapi-generator-maven-plugin.version>6.5.0</openapi-generator-maven-plugin.version>
<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
<swagger-annotations.version>1.6.0</swagger-annotations.version>
</properties>
<repositories>
@ -111,20 +106,6 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-bom</artifactId>
<version>${spring-security.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
@ -147,11 +128,6 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf-java.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
<version>${reactor-netty.version}</version>
</dependency>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>