Compare commits
6 commits
master
...
release/0.
Author | SHA1 | Date | |
---|---|---|---|
![]() |
b0c367cac7 | ||
![]() |
d10996ed60 | ||
![]() |
8e781e5d80 | ||
![]() |
183317c781 | ||
![]() |
fcd88240ba | ||
![]() |
b991fb3560 |
25 changed files with 388 additions and 159 deletions
|
@ -1,4 +1,5 @@
|
|||
FROM azul/zulu-openjdk-alpine:17-jre
|
||||
#FROM azul/zulu-openjdk-alpine:17-jre-headless
|
||||
FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
|
||||
|
||||
RUN apk add --no-cache gcompat # need to make snappy codec work
|
||||
RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui
|
||||
|
|
|
@ -6,7 +6,13 @@ import com.provectus.kafka.ui.config.ClustersProperties;
|
|||
import com.provectus.kafka.ui.connect.ApiClient;
|
||||
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
|
||||
import com.provectus.kafka.ui.connect.model.Connector;
|
||||
import com.provectus.kafka.ui.connect.model.ConnectorPlugin;
|
||||
import com.provectus.kafka.ui.connect.model.ConnectorPluginConfigValidationResponse;
|
||||
import com.provectus.kafka.ui.connect.model.ConnectorStatus;
|
||||
import com.provectus.kafka.ui.connect.model.ConnectorTask;
|
||||
import com.provectus.kafka.ui.connect.model.ConnectorTopics;
|
||||
import com.provectus.kafka.ui.connect.model.NewConnector;
|
||||
import com.provectus.kafka.ui.connect.model.TaskStatus;
|
||||
import com.provectus.kafka.ui.exception.KafkaConnectConflictReponseException;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.util.WebClientConfigurator;
|
||||
|
@ -15,11 +21,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.util.unit.DataSize;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
|
@ -79,6 +81,176 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Connector>> createConnectorWithHttpInfo(NewConnector newConnector)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.createConnectorWithHttpInfo(newConnector));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> deleteConnector(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.deleteConnector(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> deleteConnectorWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.deleteConnectorWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<Connector> getConnector(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnector(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Connector>> getConnectorWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<String, Object>> getConnectorConfig(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorConfig(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfigWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorConfigWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<ConnectorPlugin> getConnectorPlugins() throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorPlugins());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<List<ConnectorPlugin>>> getConnectorPluginsWithHttpInfo()
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorPluginsWithHttpInfo());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ConnectorStatus> getConnectorStatus(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorStatus(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ConnectorStatus>> getConnectorStatusWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorStatusWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<TaskStatus> getConnectorTaskStatus(String connectorName, Integer taskId)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorTaskStatus(connectorName, taskId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<TaskStatus>> getConnectorTaskStatusWithHttpInfo(String connectorName, Integer taskId)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorTaskStatusWithHttpInfo(connectorName, taskId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<ConnectorTask> getConnectorTasks(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorTasks(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<List<ConnectorTask>>> getConnectorTasksWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorTasksWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Map<String, ConnectorTopics>> getConnectorTopics(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorTopics(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorTopicsWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<String> getConnectors(String search) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectors(search));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.getConnectorsWithHttpInfo(search));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> pauseConnector(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.pauseConnector(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> pauseConnectorWithHttpInfo(String connectorName) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.pauseConnectorWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> restartConnector(String connectorName, Boolean includeTasks, Boolean onlyFailed)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.restartConnector(connectorName, includeTasks, onlyFailed));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> restartConnectorWithHttpInfo(String connectorName, Boolean includeTasks,
|
||||
Boolean onlyFailed) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.restartConnectorWithHttpInfo(connectorName, includeTasks, onlyFailed));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> restartConnectorTask(String connectorName, Integer taskId) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.restartConnectorTask(connectorName, taskId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> restartConnectorTaskWithHttpInfo(String connectorName, Integer taskId)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.restartConnectorTaskWithHttpInfo(connectorName, taskId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> resumeConnector(String connectorName) throws WebClientResponseException {
|
||||
return super.resumeConnector(connectorName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> resumeConnectorWithHttpInfo(String connectorName)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.resumeConnectorWithHttpInfo(connectorName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Connector>> setConnectorConfigWithHttpInfo(String connectorName,
|
||||
Map<String, Object> requestBody)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.setConnectorConfigWithHttpInfo(connectorName, requestBody));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ConnectorPluginConfigValidationResponse> validateConnectorPluginConfig(String pluginName,
|
||||
Map<String, Object> requestBody)
|
||||
throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.validateConnectorPluginConfig(pluginName, requestBody));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponse>> validateConnectorPluginConfigWithHttpInfo(
|
||||
String pluginName, Map<String, Object> requestBody) throws WebClientResponseException {
|
||||
return withRetryOnConflict(super.validateConnectorPluginConfigWithHttpInfo(pluginName, requestBody));
|
||||
}
|
||||
|
||||
private static class RetryingApiClient extends ApiClient {
|
||||
|
||||
public RetryingApiClient(ConnectCluster config,
|
||||
|
@ -108,35 +280,5 @@ public class RetryingKafkaConnectClient extends KafkaConnectClientApi {
|
|||
.configureBufferSize(maxBuffSize)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> invokeAPI(String path, HttpMethod method, Map<String, Object> pathParams,
|
||||
MultiValueMap<String, String> queryParams, Object body,
|
||||
HttpHeaders headerParams,
|
||||
MultiValueMap<String, String> cookieParams,
|
||||
MultiValueMap<String, Object> formParams, List<MediaType> accept,
|
||||
MediaType contentType, String[] authNames,
|
||||
ParameterizedTypeReference<T> returnType)
|
||||
throws RestClientException {
|
||||
return withRetryOnConflict(
|
||||
super.invokeAPI(path, method, pathParams, queryParams, body, headerParams, cookieParams,
|
||||
formParams, accept, contentType, authNames, returnType)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> invokeFluxAPI(String path, HttpMethod method, Map<String, Object> pathParams,
|
||||
MultiValueMap<String, String> queryParams, Object body,
|
||||
HttpHeaders headerParams,
|
||||
MultiValueMap<String, String> cookieParams,
|
||||
MultiValueMap<String, Object> formParams,
|
||||
List<MediaType> accept, MediaType contentType,
|
||||
String[] authNames, ParameterizedTypeReference<T> returnType)
|
||||
throws RestClientException {
|
||||
return withRetryOnConflict(
|
||||
super.invokeFluxAPI(path, method, pathParams, queryParams, body, headerParams,
|
||||
cookieParams, formParams, accept, contentType, authNames, returnType)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package com.provectus.kafka.ui.config;
|
||||
|
||||
import com.provectus.kafka.ui.model.MetricsConfig;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -8,7 +9,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package com.provectus.kafka.ui.config.auth;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.annotation.PostConstruct;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.util.Assert;
|
||||
|
|
|
@ -149,10 +149,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
|
||||
String connectName,
|
||||
public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName, String connectName,
|
||||
String connectorName,
|
||||
@Valid Mono<Object> requestBody,
|
||||
Mono<Map<String, Object>> requestBody,
|
||||
ServerWebExchange exchange) {
|
||||
|
||||
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
||||
|
@ -164,8 +163,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
return validateAccess.then(
|
||||
kafkaConnectService
|
||||
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
|
||||
.map(ResponseEntity::ok)
|
||||
);
|
||||
.map(ResponseEntity::ok));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -242,7 +240,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
|
||||
@Override
|
||||
public Mono<ResponseEntity<ConnectorPluginConfigValidationResponseDTO>> validateConnectorPluginConfig(
|
||||
String clusterName, String connectName, String pluginName, @Valid Mono<Object> requestBody,
|
||||
String clusterName, String connectName, String pluginName, @Valid Mono<Map<String, Object>> requestBody,
|
||||
ServerWebExchange exchange) {
|
||||
return kafkaConnectService
|
||||
.validateConnectorPluginConfig(
|
||||
|
|
|
@ -134,7 +134,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
|
|||
.timestamp(currentTimestamp())
|
||||
.stackTrace(Throwables.getStackTraceAsString(exception));
|
||||
return ServerResponse
|
||||
.status(exception.getStatus())
|
||||
.status(exception.getStatusCode())
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(response);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package com.provectus.kafka.ui.model.rbac;
|
||||
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
|
||||
|
||||
|
@ -25,6 +26,8 @@ import org.springframework.util.Assert;
|
|||
@EqualsAndHashCode
|
||||
public class Permission {
|
||||
|
||||
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST = List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG);
|
||||
|
||||
Resource resource;
|
||||
List<String> actions;
|
||||
|
||||
|
@ -50,7 +53,7 @@ public class Permission {
|
|||
|
||||
public void validate() {
|
||||
Assert.notNull(resource, "resource cannot be null");
|
||||
if (!List.of(KSQL, CLUSTERCONFIG).contains(this.resource)) {
|
||||
if (!RBAC_ACTION_EXEMPT_LIST.contains(this.resource)) {
|
||||
Assert.notNull(value, "permission value can't be empty for resource " + resource);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,38 +1,58 @@
|
|||
package com.provectus.kafka.ui.service;
|
||||
|
||||
import static java.util.regex.Pattern.CASE_INSENSITIVE;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.actuate.endpoint.Sanitizer;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
class KafkaConfigSanitizer extends Sanitizer {
|
||||
private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = Arrays.asList(
|
||||
"basic.auth.user.info", /* For Schema Registry credentials */
|
||||
"password", "secret", "token", "key", ".*credentials.*", /* General credential patterns */
|
||||
"aws.access.*", "aws.secret.*", "aws.session.*" /* AWS-related credential patterns */
|
||||
);
|
||||
class KafkaConfigSanitizer {
|
||||
|
||||
private static final String SANITIZED_VALUE = "******";
|
||||
|
||||
private static final String[] REGEX_PARTS = {"*", "$", "^", "+"};
|
||||
|
||||
private static final List<String> DEFAULT_PATTERNS_TO_SANITIZE = ImmutableList.<String>builder()
|
||||
.addAll(kafkaConfigKeysToSanitize())
|
||||
.add(
|
||||
"basic.auth.user.info", /* For Schema Registry credentials */
|
||||
"password", "secret", "token", "key", ".*credentials.*", /* General credential patterns */
|
||||
"aws.access.*", "aws.secret.*", "aws.session.*" /* AWS-related credential patterns */
|
||||
)
|
||||
.build();
|
||||
|
||||
private final List<Pattern> sanitizeKeysPatterns;
|
||||
|
||||
KafkaConfigSanitizer(
|
||||
@Value("${kafka.config.sanitizer.enabled:true}") boolean enabled,
|
||||
@Value("${kafka.config.sanitizer.patterns:}") List<String> patternsToSanitize
|
||||
) {
|
||||
if (!enabled) {
|
||||
setKeysToSanitize();
|
||||
} else {
|
||||
var keysToSanitize = new HashSet<>(
|
||||
patternsToSanitize.isEmpty() ? DEFAULT_PATTERNS_TO_SANITIZE : patternsToSanitize);
|
||||
keysToSanitize.addAll(kafkaConfigKeysToSanitize());
|
||||
setKeysToSanitize(keysToSanitize.toArray(new String[] {}));
|
||||
}
|
||||
this.sanitizeKeysPatterns = enabled
|
||||
? compile(patternsToSanitize.isEmpty() ? DEFAULT_PATTERNS_TO_SANITIZE : patternsToSanitize)
|
||||
: List.of();
|
||||
}
|
||||
|
||||
private static List<Pattern> compile(Collection<String> patternStrings) {
|
||||
return patternStrings.stream()
|
||||
.map(p -> isRegex(p)
|
||||
? Pattern.compile(p, CASE_INSENSITIVE)
|
||||
: Pattern.compile(".*" + p + "$", CASE_INSENSITIVE))
|
||||
.toList();
|
||||
}
|
||||
|
||||
private static boolean isRegex(String str) {
|
||||
return Arrays.stream(REGEX_PARTS).anyMatch(str::contains);
|
||||
}
|
||||
|
||||
private static Set<String> kafkaConfigKeysToSanitize() {
|
||||
|
@ -45,4 +65,17 @@ class KafkaConfigSanitizer extends Sanitizer {
|
|||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public Object sanitize(String key, Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
for (Pattern pattern : sanitizeKeysPatterns) {
|
||||
if (pattern.matcher(key).matches()) {
|
||||
return SANITIZED_VALUE;
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -225,11 +225,11 @@ public class KafkaConnectService {
|
|||
}
|
||||
|
||||
public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
|
||||
String connectorName, Mono<Object> requestBody) {
|
||||
String connectorName, Mono<Map<String, Object>> requestBody) {
|
||||
return api(cluster, connectName)
|
||||
.mono(c ->
|
||||
requestBody
|
||||
.flatMap(body -> c.setConnectorConfig(connectorName, (Map<String, Object>) body))
|
||||
.flatMap(body -> c.setConnectorConfig(connectorName, body))
|
||||
.map(kafkaConnectMapper::fromClient));
|
||||
}
|
||||
|
||||
|
@ -298,12 +298,12 @@ public class KafkaConnectService {
|
|||
}
|
||||
|
||||
public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(
|
||||
KafkaCluster cluster, String connectName, String pluginName, Mono<Object> requestBody) {
|
||||
KafkaCluster cluster, String connectName, String pluginName, Mono<Map<String, Object>> requestBody) {
|
||||
return api(cluster, connectName)
|
||||
.mono(client ->
|
||||
requestBody
|
||||
.flatMap(body ->
|
||||
client.validateConnectorPluginConfig(pluginName, (Map<String, Object>) body))
|
||||
client.validateConnectorPluginConfig(pluginName, body))
|
||||
.map(kafkaConnectMapper::fromClient)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import static java.util.stream.Collectors.toList;
|
|||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableTable;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Table;
|
||||
|
@ -498,6 +499,14 @@ public class ReactiveAdminClient implements Closeable {
|
|||
.flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
|
||||
}
|
||||
|
||||
/**
|
||||
* List offset for the specified topics, skipping no-leader partitions.
|
||||
*/
|
||||
public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
|
||||
OffsetSpec offsetSpec) {
|
||||
return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
|
||||
}
|
||||
|
||||
private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
|
||||
boolean failOnUnknownLeader) {
|
||||
var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
|
||||
|
@ -507,34 +516,44 @@ public class ReactiveAdminClient implements Closeable {
|
|||
descriptions.values(), partitions::contains, failOnUnknownLeader));
|
||||
}
|
||||
|
||||
private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
||||
@VisibleForTesting
|
||||
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
|
||||
Predicate<TopicPartition> partitionPredicate,
|
||||
boolean failOnUnknownLeader) {
|
||||
var goodPartitions = new HashSet<TopicPartition>();
|
||||
for (TopicDescription description : topicDescriptions) {
|
||||
var goodTopicPartitions = new ArrayList<TopicPartition>();
|
||||
for (TopicPartitionInfo partitionInfo : description.partitions()) {
|
||||
TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
|
||||
if (!partitionPredicate.test(topicPartition)) {
|
||||
continue;
|
||||
if (partitionInfo.leader() == null) {
|
||||
if (failOnUnknownLeader) {
|
||||
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
|
||||
} else {
|
||||
// if ANY of topic partitions has no leader - we have to skip all topic partitions
|
||||
goodTopicPartitions.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (partitionInfo.leader() != null) {
|
||||
goodPartitions.add(topicPartition);
|
||||
} else if (failOnUnknownLeader) {
|
||||
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
|
||||
if (partitionPredicate.test(topicPartition)) {
|
||||
goodTopicPartitions.add(topicPartition);
|
||||
}
|
||||
}
|
||||
goodPartitions.addAll(goodTopicPartitions);
|
||||
}
|
||||
return goodPartitions;
|
||||
}
|
||||
|
||||
// 1. NOTE(!): should only apply for partitions with existing leader,
|
||||
// 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
|
||||
// otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
|
||||
// 2. NOTE(!): Skips partitions that were not initialized yet
|
||||
// (UnknownTopicOrPartitionException thrown, ex. after topic creation)
|
||||
// 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
|
||||
@KafkaClientInternalsDependant
|
||||
public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
|
||||
OffsetSpec offsetSpec) {
|
||||
@VisibleForTesting
|
||||
Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
|
||||
if (partitions.isEmpty()) {
|
||||
return Mono.just(Map.of());
|
||||
}
|
||||
|
||||
Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
|
||||
parts -> {
|
||||
|
|
|
@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
|
|||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.exception.TopicMetadataException;
|
||||
import com.provectus.kafka.ui.exception.TopicNotFoundException;
|
||||
|
@ -136,22 +137,14 @@ public class TopicsService {
|
|||
}
|
||||
|
||||
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
|
||||
descriptions,
|
||||
descriptionsMap,
|
||||
ReactiveAdminClient ac) {
|
||||
var topicPartitions = descriptions.values().stream()
|
||||
.flatMap(desc ->
|
||||
desc.partitions().stream()
|
||||
// list offsets should only be applied to partitions with existing leader
|
||||
// (see ReactiveAdminClient.listOffsetsUnsafe(..) docs)
|
||||
.filter(tp -> tp.leader() != null)
|
||||
.map(p -> new TopicPartition(desc.name(), p.partition())))
|
||||
.collect(toList());
|
||||
|
||||
return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest())
|
||||
.zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()),
|
||||
var descriptions = descriptionsMap.values();
|
||||
return ac.listOffsets(descriptions, OffsetSpec.earliest())
|
||||
.zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()),
|
||||
(earliest, latest) ->
|
||||
topicPartitions.stream()
|
||||
.filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
|
||||
Sets.intersection(earliest.keySet(), latest.keySet())
|
||||
.stream()
|
||||
.map(tp ->
|
||||
Map.entry(tp,
|
||||
new InternalPartitionsOffsets.Offsets(
|
||||
|
|
|
@ -2,7 +2,7 @@ package com.provectus.kafka.ui.service.analyze;
|
|||
|
||||
import com.provectus.kafka.ui.model.TopicAnalysisSizeStatsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicAnalysisStatsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicAnalysisStatsHourlyMsgCountsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicAnalysisStatsHourlyMsgCountsInnerDTO;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
|
@ -78,10 +78,10 @@ class TopicAnalysisStats {
|
|||
}
|
||||
}
|
||||
|
||||
List<TopicAnalysisStatsHourlyMsgCountsDTO> toDto() {
|
||||
List<TopicAnalysisStatsHourlyMsgCountsInnerDTO> toDto() {
|
||||
return hourlyStats.entrySet().stream()
|
||||
.sorted(Comparator.comparingLong(Map.Entry::getKey))
|
||||
.map(e -> new TopicAnalysisStatsHourlyMsgCountsDTO()
|
||||
.map(e -> new TopicAnalysisStatsHourlyMsgCountsInnerDTO()
|
||||
.hourStart(e.getKey())
|
||||
.count(e.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
|
|
|
@ -21,6 +21,7 @@ import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
|
|||
import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
|
||||
import com.provectus.kafka.ui.service.rbac.extractor.LdapAuthorityExtractor;
|
||||
import com.provectus.kafka.ui.service.rbac.extractor.ProviderAuthorityExtractor;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -28,7 +29,6 @@ import java.util.function.Predicate;
|
|||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
package com.provectus.kafka.ui.service.rbac.extractor;
|
||||
|
||||
import com.nimbusds.jose.shaded.json.JSONArray;
|
||||
import com.provectus.kafka.ui.model.rbac.Role;
|
||||
import com.provectus.kafka.ui.model.rbac.provider.Provider;
|
||||
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -44,7 +44,7 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
.map(Role::getName)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
JSONArray groups = principal.getAttribute(COGNITO_GROUPS_ATTRIBUTE_NAME);
|
||||
List<String> groups = principal.getAttribute(COGNITO_GROUPS_ATTRIBUTE_NAME);
|
||||
if (groups == null) {
|
||||
log.debug("Cognito groups param is not present");
|
||||
return Mono.just(groupsByUsername);
|
||||
|
@ -56,9 +56,8 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_COGNITO))
|
||||
.filter(s -> s.getType().equals("group"))
|
||||
.anyMatch(subject -> Stream.of(groups.toArray())
|
||||
.anyMatch(subject -> Stream.of(groups)
|
||||
.map(Object::toString)
|
||||
.distinct()
|
||||
.anyMatch(cognitoGroup -> cognitoGroup.equals(subject.getValue()))
|
||||
))
|
||||
.map(Role::getName)
|
||||
|
|
|
@ -16,7 +16,7 @@ import org.springframework.context.ApplicationContextInitializer;
|
|||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.util.SocketUtils;
|
||||
import org.springframework.test.util.TestSocketUtils;
|
||||
import org.testcontainers.containers.KafkaContainer;
|
||||
import org.testcontainers.containers.Network;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
@ -61,7 +61,7 @@ public abstract class AbstractIntegrationTest {
|
|||
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());
|
||||
// List unavailable hosts to verify failover
|
||||
System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
|
||||
SocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
|
||||
TestSocketUtils.findAvailableTcpPort(), schemaRegistry.getUrl()));
|
||||
System.setProperty("kafka.clusters.0.kafkaConnect.0.name", "kafka-connect");
|
||||
System.setProperty("kafka.clusters.0.kafkaConnect.0.userName", "kafka-connect");
|
||||
System.setProperty("kafka.clusters.0.kafkaConnect.0.password", "kafka-connect");
|
||||
|
|
|
@ -5,13 +5,12 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.boot.actuate.endpoint.Sanitizer;
|
||||
|
||||
class KafkaConfigSanitizerTest {
|
||||
|
||||
@Test
|
||||
void doNothingIfEnabledPropertySetToFalse() {
|
||||
final Sanitizer sanitizer = new KafkaConfigSanitizer(false, Collections.emptyList());
|
||||
final var sanitizer = new KafkaConfigSanitizer(false, Collections.emptyList());
|
||||
assertThat(sanitizer.sanitize("password", "secret")).isEqualTo("secret");
|
||||
assertThat(sanitizer.sanitize("sasl.jaas.config", "secret")).isEqualTo("secret");
|
||||
assertThat(sanitizer.sanitize("database.password", "secret")).isEqualTo("secret");
|
||||
|
@ -19,7 +18,7 @@ class KafkaConfigSanitizerTest {
|
|||
|
||||
@Test
|
||||
void obfuscateCredentials() {
|
||||
final Sanitizer sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
|
||||
final var sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
|
||||
assertThat(sanitizer.sanitize("sasl.jaas.config", "secret")).isEqualTo("******");
|
||||
assertThat(sanitizer.sanitize("consumer.sasl.jaas.config", "secret")).isEqualTo("******");
|
||||
assertThat(sanitizer.sanitize("producer.sasl.jaas.config", "secret")).isEqualTo("******");
|
||||
|
@ -37,7 +36,7 @@ class KafkaConfigSanitizerTest {
|
|||
|
||||
@Test
|
||||
void notObfuscateNormalConfigs() {
|
||||
final Sanitizer sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
|
||||
final var sanitizer = new KafkaConfigSanitizer(true, Collections.emptyList());
|
||||
assertThat(sanitizer.sanitize("security.protocol", "SASL_SSL")).isEqualTo("SASL_SSL");
|
||||
final String[] bootstrapServer = new String[] {"test1:9092", "test2:9092"};
|
||||
assertThat(sanitizer.sanitize("bootstrap.servers", bootstrapServer)).isEqualTo(bootstrapServer);
|
||||
|
@ -45,7 +44,7 @@ class KafkaConfigSanitizerTest {
|
|||
|
||||
@Test
|
||||
void obfuscateCredentialsWithDefinedPatterns() {
|
||||
final Sanitizer sanitizer = new KafkaConfigSanitizer(true, Arrays.asList("kafka.ui", ".*test.*"));
|
||||
final var sanitizer = new KafkaConfigSanitizer(true, Arrays.asList("kafka.ui", ".*test.*"));
|
||||
assertThat(sanitizer.sanitize("consumer.kafka.ui", "secret")).isEqualTo("******");
|
||||
assertThat(sanitizer.sanitize("this.is.test.credentials", "secret")).isEqualTo("******");
|
||||
assertThat(sanitizer.sanitize("this.is.not.credential", "not.credential"))
|
||||
|
|
|
@ -4,8 +4,11 @@ import static com.provectus.kafka.ui.service.ReactiveAdminClient.toMonoWithExcep
|
|||
import static java.util.Objects.requireNonNull;
|
||||
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.assertj.core.api.ThrowableAssert.ThrowingCallable;
|
||||
|
||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.producer.KafkaTestProducer;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
|
@ -22,16 +25,20 @@ import org.apache.kafka.clients.admin.Config;
|
|||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.kafka.common.config.ConfigResource;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.internals.KafkaFutureImpl;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.assertj.core.api.ThrowableAssert;
|
||||
import org.junit.function.ThrowingRunnable;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -133,6 +140,56 @@ class ReactiveAdminClientTest extends AbstractIntegrationTest {
|
|||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
void filterPartitionsWithLeaderCheckSkipsPartitionsFromTopicWhereSomePartitionsHaveNoLeader() {
|
||||
var filteredPartitions = ReactiveAdminClient.filterPartitionsWithLeaderCheck(
|
||||
List.of(
|
||||
// contains partitions with no leader
|
||||
new TopicDescription("noLeaderTopic", false,
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
|
||||
new TopicPartitionInfo(1, null, List.of(), List.of()))),
|
||||
// should be skipped by predicate
|
||||
new TopicDescription("skippingByPredicate", false,
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))),
|
||||
// good topic
|
||||
new TopicDescription("good", false,
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
|
||||
new TopicPartitionInfo(1, new Node(2, "n2", 9092), List.of(), List.of()))
|
||||
)),
|
||||
p -> !p.topic().equals("skippingByPredicate"),
|
||||
false
|
||||
);
|
||||
|
||||
assertThat(filteredPartitions)
|
||||
.containsExactlyInAnyOrder(
|
||||
new TopicPartition("good", 0),
|
||||
new TopicPartition("good", 1)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
void filterPartitionsWithLeaderCheckThrowExceptionIfThereIsSomePartitionsWithoutLeaderAndFlagSet() {
|
||||
ThrowingCallable call = () -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(
|
||||
List.of(
|
||||
// contains partitions with no leader
|
||||
new TopicDescription("t1", false,
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
|
||||
new TopicPartitionInfo(1, null, List.of(), List.of()))),
|
||||
new TopicDescription("t2", false,
|
||||
List.of(
|
||||
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))
|
||||
)),
|
||||
p -> true,
|
||||
// setting failOnNoLeader flag
|
||||
true
|
||||
);
|
||||
assertThatThrownBy(call).isInstanceOf(ValidationException.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testListOffsetsUnsafe() {
|
||||
String topic = UUID.randomUUID().toString();
|
||||
|
|
|
@ -15,7 +15,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
|
|||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.util.unit.DataSize;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
class KsqlServiceV2Test extends AbstractIntegrationTest {
|
||||
|
@ -27,8 +26,6 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
|
|||
private static final Set<String> STREAMS_TO_DELETE = new CopyOnWriteArraySet<>();
|
||||
private static final Set<String> TABLES_TO_DELETE = new CopyOnWriteArraySet<>();
|
||||
|
||||
private static final DataSize maxBuffSize = DataSize.ofMegabytes(20);
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
KSQL_DB.start();
|
||||
|
|
|
@ -27,20 +27,24 @@
|
|||
<artifactId>spring-boot-starter-validation</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.swagger</groupId>
|
||||
<artifactId>swagger-annotations</artifactId>
|
||||
<version>${swagger-annotations.version}</version>
|
||||
<groupId>io.swagger.core.v3</groupId>
|
||||
<artifactId>swagger-integration-jakarta</artifactId>
|
||||
<version>2.2.8</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.openapitools</groupId>
|
||||
<artifactId>jackson-databind-nullable</artifactId>
|
||||
<version>${jackson-databind-nullable.version}</version>
|
||||
<version>0.2.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
<version>3.0.2</version>
|
||||
<scope>provided</scope>
|
||||
<groupId>jakarta.annotation</groupId>
|
||||
<artifactId>jakarta.annotation-api</artifactId>
|
||||
<version>2.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.annotation</groupId>
|
||||
<artifactId>javax.annotation-api</artifactId>
|
||||
<version>1.3.2</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
@ -71,6 +75,7 @@
|
|||
<library>webclient</library>
|
||||
<useBeanValidation>true</useBeanValidation>
|
||||
<dateLibrary>java8</dateLibrary>
|
||||
<useJakartaEe>true</useJakartaEe>
|
||||
</configOptions>
|
||||
</configuration>
|
||||
</execution>
|
||||
|
@ -80,8 +85,7 @@
|
|||
<goal>generate</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<inputSpec>${project.basedir}/src/main/resources/swagger/kafka-ui-api.yaml
|
||||
</inputSpec>
|
||||
<inputSpec>${project.basedir}/src/main/resources/swagger/kafka-ui-api.yaml</inputSpec>
|
||||
<output>${project.build.directory}/generated-sources/api</output>
|
||||
<generatorName>spring</generatorName>
|
||||
<modelNameSuffix>DTO</modelNameSuffix>
|
||||
|
@ -89,14 +93,12 @@
|
|||
<modelPackage>com.provectus.kafka.ui.model</modelPackage>
|
||||
<apiPackage>com.provectus.kafka.ui.api</apiPackage>
|
||||
<sourceFolder>kafka-ui-contract</sourceFolder>
|
||||
|
||||
<reactive>true</reactive>
|
||||
|
||||
<interfaceOnly>true</interfaceOnly>
|
||||
<skipDefaultInterface>true</skipDefaultInterface>
|
||||
<useBeanValidation>true</useBeanValidation>
|
||||
<useTags>true</useTags>
|
||||
|
||||
<useSpringBoot3>true</useSpringBoot3>
|
||||
<dateLibrary>java8</dateLibrary>
|
||||
</configOptions>
|
||||
<typeMappings>
|
||||
|
@ -116,15 +118,13 @@
|
|||
<generatorName>java</generatorName>
|
||||
<generateApiTests>false</generateApiTests>
|
||||
<generateModelTests>false</generateModelTests>
|
||||
|
||||
<configOptions>
|
||||
<modelPackage>com.provectus.kafka.ui.connect.model</modelPackage>
|
||||
<apiPackage>com.provectus.kafka.ui.connect.api</apiPackage>
|
||||
<sourceFolder>kafka-connect-client</sourceFolder>
|
||||
|
||||
<asyncNative>true</asyncNative>
|
||||
<library>webclient</library>
|
||||
|
||||
<useJakartaEe>true</useJakartaEe>
|
||||
<useBeanValidation>true</useBeanValidation>
|
||||
<dateLibrary>java8</dateLibrary>
|
||||
</configOptions>
|
||||
|
@ -142,15 +142,13 @@
|
|||
<generatorName>java</generatorName>
|
||||
<generateApiTests>false</generateApiTests>
|
||||
<generateModelTests>false</generateModelTests>
|
||||
|
||||
<configOptions>
|
||||
<modelPackage>com.provectus.kafka.ui.sr.model</modelPackage>
|
||||
<apiPackage>com.provectus.kafka.ui.sr.api</apiPackage>
|
||||
<sourceFolder>kafka-sr-client</sourceFolder>
|
||||
|
||||
<asyncNative>true</asyncNative>
|
||||
<library>webclient</library>
|
||||
|
||||
<useJakartaEe>true</useJakartaEe>
|
||||
<useBeanValidation>true</useBeanValidation>
|
||||
<dateLibrary>java8</dateLibrary>
|
||||
</configOptions>
|
||||
|
|
|
@ -2387,6 +2387,10 @@ components:
|
|||
- UNKNOWN
|
||||
|
||||
ConsumerGroup:
|
||||
discriminator:
|
||||
propertyName: inherit
|
||||
mapping:
|
||||
details: "#/components/schemas/ConsumerGroupDetails"
|
||||
type: object
|
||||
properties:
|
||||
groupId:
|
||||
|
|
|
@ -231,9 +231,13 @@ const Filters: React.FC<FiltersProps> = ({
|
|||
props.seekType = SeekType.TIMESTAMP;
|
||||
}
|
||||
|
||||
const isSeekTypeWithSeekTo =
|
||||
props.seekType === SeekType.TIMESTAMP ||
|
||||
props.seekType === SeekType.OFFSET;
|
||||
|
||||
if (
|
||||
selectedPartitions.length !== partitions.length ||
|
||||
currentSeekType === SeekType.TIMESTAMP
|
||||
isSeekTypeWithSeekTo
|
||||
) {
|
||||
// not everything in the partition is selected
|
||||
props.seekTo = selectedPartitions.map(({ value }) => {
|
||||
|
@ -323,7 +327,9 @@ const Filters: React.FC<FiltersProps> = ({
|
|||
// eslint-disable-next-line consistent-return
|
||||
React.useEffect(() => {
|
||||
if (location.search?.length !== 0) {
|
||||
const url = `${BASE_PARAMS.basePath}/api/clusters/${clusterName}/topics/${topicName}/messages${location.search}`;
|
||||
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
|
||||
clusterName
|
||||
)}/topics/${topicName}/messages${location.search}`;
|
||||
const sse = new EventSource(url);
|
||||
|
||||
source.current = sse;
|
||||
|
|
|
@ -49,7 +49,7 @@ const CustomParamField: React.FC<Props> = ({
|
|||
label: option,
|
||||
disabled:
|
||||
(config &&
|
||||
config[option].source !== ConfigSource.DYNAMIC_TOPIC_CONFIG) ||
|
||||
config[option]?.source !== ConfigSource.DYNAMIC_TOPIC_CONFIG) ||
|
||||
existingFields.includes(option),
|
||||
}));
|
||||
|
||||
|
|
|
@ -90,7 +90,9 @@ export const useKsqlkDbSSE = ({ clusterName, pipeId }: UseKsqlkDbSSEProps) => {
|
|||
|
||||
React.useEffect(() => {
|
||||
const fetchData = async () => {
|
||||
const url = `${BASE_PARAMS.basePath}/api/clusters/${clusterName}/ksql/response`;
|
||||
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
|
||||
clusterName
|
||||
)}/ksql/response`;
|
||||
await fetchEventSource(
|
||||
`${url}?${new URLSearchParams({ pipeId: pipeId || '' }).toString()}`,
|
||||
{
|
||||
|
|
|
@ -51,7 +51,9 @@ export const useTopicMessages = ({
|
|||
React.useEffect(() => {
|
||||
const fetchData = async () => {
|
||||
setIsFetching(true);
|
||||
const url = `${BASE_PARAMS.basePath}/api/clusters/${clusterName}/topics/${topicName}/messages`;
|
||||
const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
|
||||
clusterName
|
||||
)}/topics/${topicName}/messages`;
|
||||
const requestParams = new URLSearchParams({
|
||||
limit,
|
||||
seekTo: seekTo.replaceAll('-', '::').replaceAll('.', ','),
|
||||
|
|
30
pom.xml
30
pom.xml
|
@ -30,17 +30,13 @@
|
|||
<datasketches-java.version>3.1.0</datasketches-java.version>
|
||||
<groovy.version>3.0.13</groovy.version>
|
||||
<jackson.version>2.14.0</jackson.version>
|
||||
<jackson-databind-nullable.version>0.2.4</jackson-databind-nullable.version>
|
||||
<kafka-clients.version>3.3.1</kafka-clients.version>
|
||||
<netty.version>4.1.85.Final</netty.version>
|
||||
<org.mapstruct.version>1.4.2.Final</org.mapstruct.version>
|
||||
<org.projectlombok.version>1.18.24</org.projectlombok.version>
|
||||
<protobuf-java.version>3.21.9</protobuf-java.version>
|
||||
<reactor-netty.version>1.1.0</reactor-netty.version>
|
||||
<scala-lang.library.version>2.13.9</scala-lang.library.version>
|
||||
<snakeyaml.version>1.33</snakeyaml.version>
|
||||
<spring-boot.version>2.7.5</spring-boot.version>
|
||||
<spring-security.version>5.7.5</spring-security.version>
|
||||
<snakeyaml.version>2.0</snakeyaml.version>
|
||||
<spring-boot.version>3.0.5</spring-boot.version>
|
||||
<kafka-ui-serde-api.version>1.0.0</kafka-ui-serde-api.version>
|
||||
<odd-oddrn-generator.version>0.1.15</odd-oddrn-generator.version>
|
||||
<odd-oddrn-client.version>0.1.23</odd-oddrn-client.version>
|
||||
|
@ -62,9 +58,8 @@
|
|||
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
|
||||
<maven-resources-plugin.version>3.2.0</maven-resources-plugin.version>
|
||||
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
|
||||
<openapi-generator-maven-plugin.version>4.3.0</openapi-generator-maven-plugin.version>
|
||||
<openapi-generator-maven-plugin.version>6.5.0</openapi-generator-maven-plugin.version>
|
||||
<springdoc-openapi-webflux-ui.version>1.2.32</springdoc-openapi-webflux-ui.version>
|
||||
<swagger-annotations.version>1.6.0</swagger-annotations.version>
|
||||
</properties>
|
||||
|
||||
<repositories>
|
||||
|
@ -111,20 +106,6 @@
|
|||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.security</groupId>
|
||||
<artifactId>spring-security-bom</artifactId>
|
||||
<version>${spring-security.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-bom</artifactId>
|
||||
<version>${netty.version}</version>
|
||||
<type>pom</type>
|
||||
<scope>import</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson</groupId>
|
||||
<artifactId>jackson-bom</artifactId>
|
||||
|
@ -147,11 +128,6 @@
|
|||
<artifactId>protobuf-java</artifactId>
|
||||
<version>${protobuf-java.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor.netty</groupId>
|
||||
<artifactId>reactor-netty-http</artifactId>
|
||||
<version>${reactor-netty.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.junit</groupId>
|
||||
<artifactId>junit-bom</artifactId>
|
||||
|
|
Loading…
Add table
Reference in a new issue