Merge branch 'master' into tarun_samanta/#3944
This commit is contained in:
commit
539fccaa9e
78 changed files with 2305 additions and 2133 deletions
2
.github/workflows/aws_publisher.yaml
vendored
2
.github/workflows/aws_publisher.yaml
vendored
|
@ -31,7 +31,7 @@ jobs:
|
|||
echo "Packer will be triggered in this dir $WORK_DIR"
|
||||
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_AMI_PUBLISH_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_AMI_PUBLISH_KEY_SECRET }}
|
||||
|
|
2
.github/workflows/branch-deploy.yml
vendored
2
.github/workflows/branch-deploy.yml
vendored
|
@ -45,7 +45,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/build-public-image.yml
vendored
2
.github/workflows/build-public-image.yml
vendored
|
@ -42,7 +42,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/delete-public-image.yml
vendored
2
.github/workflows/delete-public-image.yml
vendored
|
@ -15,7 +15,7 @@ jobs:
|
|||
tag='${{ github.event.pull_request.number }}'
|
||||
echo "tag=${tag}" >> $GITHUB_OUTPUT
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/e2e-automation.yml
vendored
2
.github/workflows/e2e-automation.yml
vendored
|
@ -24,7 +24,7 @@ jobs:
|
|||
with:
|
||||
ref: ${{ github.sha }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/e2e-checks.yaml
vendored
2
.github/workflows/e2e-checks.yaml
vendored
|
@ -18,7 +18,7 @@ jobs:
|
|||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/e2e-weekly.yml
vendored
2
.github/workflows/e2e-weekly.yml
vendored
|
@ -11,7 +11,7 @@ jobs:
|
|||
with:
|
||||
ref: ${{ github.sha }}
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
6
.github/workflows/frontend.yaml
vendored
6
.github/workflows/frontend.yaml
vendored
|
@ -25,11 +25,11 @@ jobs:
|
|||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
- uses: pnpm/action-setup@v2.4.0
|
||||
with:
|
||||
version: 7.4.0
|
||||
version: 8.6.12
|
||||
- name: Install node
|
||||
uses: actions/setup-node@v3.7.0
|
||||
uses: actions/setup-node@v3.8.1
|
||||
with:
|
||||
node-version: "16.15.0"
|
||||
node-version: "18.17.1"
|
||||
cache: "pnpm"
|
||||
cache-dependency-path: "./kafka-ui-react-app/pnpm-lock.yaml"
|
||||
- name: Install Node dependencies
|
||||
|
|
|
@ -47,7 +47,7 @@ jobs:
|
|||
restore-keys: |
|
||||
${{ runner.os }}-buildx-
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
2
.github/workflows/terraform-deploy.yml
vendored
2
.github/workflows/terraform-deploy.yml
vendored
|
@ -26,7 +26,7 @@ jobs:
|
|||
echo "Terraform will be triggered in this dir $TF_DIR"
|
||||
|
||||
- name: Configure AWS credentials for Kafka-UI account
|
||||
uses: aws-actions/configure-aws-credentials@v2
|
||||
uses: aws-actions/configure-aws-credentials@v3
|
||||
with:
|
||||
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
|
||||
|
|
|
@ -7,8 +7,6 @@ import org.springframework.http.HttpMethod;
|
|||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.web.reactive.config.CorsRegistry;
|
||||
import org.springframework.web.reactive.config.WebFluxConfigurer;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.WebFilter;
|
||||
import org.springframework.web.server.WebFilterChain;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.provectus.kafka.ui.config;
|
||||
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import java.beans.Transient;
|
||||
import javax.annotation.PostConstruct;
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package com.provectus.kafka.ui.config.auth;
|
||||
|
||||
import java.util.Collection;
|
||||
import lombok.Value;
|
||||
|
||||
public record AuthenticatedUser(String principal, Collection<String> groups) {
|
||||
|
||||
|
|
|
@ -6,11 +6,13 @@ import lombok.extern.slf4j.Slf4j;
|
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
|
||||
import org.springframework.security.config.web.server.ServerHttpSecurity;
|
||||
import org.springframework.security.web.server.SecurityWebFilterChain;
|
||||
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
|
||||
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
|
||||
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;
|
||||
|
||||
@Configuration
|
||||
@EnableWebFluxSecurity
|
||||
|
@ -39,7 +41,9 @@ public class BasicAuthSecurityConfig extends AbstractAuthSecurityConfig {
|
|||
.authenticated()
|
||||
)
|
||||
.formLogin(spec -> spec.loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler))
|
||||
.logout(spec -> spec.logoutSuccessHandler(logoutSuccessHandler))
|
||||
.logout(spec -> spec
|
||||
.logoutSuccessHandler(logoutSuccessHandler)
|
||||
.requiresLogout(ServerWebExchangeMatchers.pathMatchers(HttpMethod.GET, "/logout")))
|
||||
.csrf(ServerHttpSecurity.CsrfSpec::disable)
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.provectus.kafka.ui.config.auth;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import lombok.Value;
|
||||
import org.springframework.security.core.GrantedAuthority;
|
||||
import org.springframework.security.oauth2.core.user.OAuth2User;
|
||||
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.provectus.kafka.ui.config.auth;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import lombok.Value;
|
||||
import org.springframework.security.core.GrantedAuthority;
|
||||
import org.springframework.security.oauth2.core.oidc.OidcIdToken;
|
||||
import org.springframework.security.oauth2.core.oidc.OidcUserInfo;
|
||||
|
|
|
@ -13,7 +13,6 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -38,7 +37,7 @@ public class AccessController implements AuthorizationApi {
|
|||
.filter(role -> user.groups().contains(role.getName()))
|
||||
.map(role -> mapPermissions(role.getPermissions(), role.getClusters()))
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toList())
|
||||
.toList()
|
||||
)
|
||||
.switchIfEmpty(Mono.just(Collections.emptyList()));
|
||||
|
||||
|
@ -70,10 +69,10 @@ public class AccessController implements AuthorizationApi {
|
|||
.map(String::toUpperCase)
|
||||
.map(this::mapAction)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
.toList());
|
||||
return dto;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
|
|
@ -82,12 +82,13 @@ public class ApplicationConfigController extends AbstractController implements A
|
|||
.build();
|
||||
return validateAccess(context)
|
||||
.then(restartRequestDto)
|
||||
.<ResponseEntity<Void>>map(dto -> {
|
||||
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
|
||||
restarter.requestRestart();
|
||||
return ResponseEntity.ok().build();
|
||||
.doOnNext(restartDto -> {
|
||||
var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties());
|
||||
dynamicConfigOperations.persist(newConfig);
|
||||
})
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
.doOnEach(sig -> audit(context, sig))
|
||||
.doOnSuccess(dto -> restarter.requestRestart())
|
||||
.map(dto -> ResponseEntity.ok().build());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -116,8 +117,8 @@ public class ApplicationConfigController extends AbstractController implements A
|
|||
return validateAccess(context)
|
||||
.then(configDto)
|
||||
.flatMap(config -> {
|
||||
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
|
||||
ClustersProperties clustersProperties = propertiesStructure.getKafka();
|
||||
PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties());
|
||||
ClustersProperties clustersProperties = newConfig.getKafka();
|
||||
return validateClustersConfig(clustersProperties)
|
||||
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
|
||||
})
|
||||
|
|
|
@ -36,10 +36,10 @@ public class AuthController {
|
|||
+ " <meta name=\"description\" content=\"\">\n"
|
||||
+ " <meta name=\"author\" content=\"\">\n"
|
||||
+ " <title>Please sign in</title>\n"
|
||||
+ " <link href=\"/static/css/bootstrap.min.css\" rel=\"stylesheet\" "
|
||||
+ " <link href=\"" + contextPath + "/static/css/bootstrap.min.css\" rel=\"stylesheet\" "
|
||||
+ "integrity=\"sha384-/Y6pD6FV/Vv2HJnA6t+vslU6fwYXjCFtcEpHbNJ0lyAFsXTsjBbfaDjzALeQsN6M\" "
|
||||
+ "crossorigin=\"anonymous\">\n"
|
||||
+ " <link href=\"/static/css/signin.css\" "
|
||||
+ " <link href=\"" + contextPath + "/static/css/signin.css\" "
|
||||
+ "rel=\"stylesheet\" crossorigin=\"anonymous\"/>\n"
|
||||
+ " </head>\n"
|
||||
+ " <body>\n"
|
||||
|
|
|
@ -26,6 +26,8 @@ import reactor.core.publisher.Mono;
|
|||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class BrokersController extends AbstractController implements BrokersApi {
|
||||
private static final String BROKER_ID = "brokerId";
|
||||
|
||||
private final BrokerService brokerService;
|
||||
private final ClusterMapper clusterMapper;
|
||||
|
||||
|
@ -89,7 +91,7 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW)
|
||||
.operationName("getBrokerConfig")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).thenReturn(
|
||||
|
@ -108,7 +110,7 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
|
||||
.operationName("updateBrokerTopicPartitionLogDir")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).then(
|
||||
|
@ -128,7 +130,7 @@ public class BrokersController extends AbstractController implements BrokersApi
|
|||
.cluster(clusterName)
|
||||
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
|
||||
.operationName("updateBrokerConfigByName")
|
||||
.operationParams(Map.of("brokerId", id))
|
||||
.operationParams(Map.of(BROKER_ID, id))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).then(
|
||||
|
|
|
@ -22,7 +22,6 @@ import com.provectus.kafka.ui.service.OffsetsResetService;
|
|||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
|
@ -200,7 +199,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
|
|||
.consumerGroups(consumerGroupConsumerGroupsPage.consumerGroups()
|
||||
.stream()
|
||||
.map(ConsumerGroupMapper::toDto)
|
||||
.collect(Collectors.toList()));
|
||||
.toList());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import reactor.core.publisher.Mono;
|
|||
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
|
||||
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
|
||||
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
|
||||
private static final String CONNECTOR_NAME = "connectorName";
|
||||
|
||||
private final KafkaConnectService kafkaConnectService;
|
||||
|
||||
|
@ -112,7 +113,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
||||
.operationName("deleteConnector")
|
||||
.operationParams(Map.of("connectorName", connectName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectName))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).then(
|
||||
|
@ -180,7 +181,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
||||
.operationName("setConnectorConfig")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).then(
|
||||
|
@ -207,7 +208,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(connectActions)
|
||||
.operationName("updateConnectorState")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).then(
|
||||
|
@ -227,7 +228,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW)
|
||||
.operationName("getConnectorTasks")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).thenReturn(
|
||||
|
@ -247,7 +248,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|||
.connect(connectName)
|
||||
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
|
||||
.operationName("restartConnectorTask")
|
||||
.operationParams(Map.of("connectorName", connectorName))
|
||||
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
|
||||
.build();
|
||||
|
||||
return validateAccess(context).then(
|
||||
|
|
|
@ -15,7 +15,6 @@ import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
|
|||
import com.provectus.kafka.ui.service.SchemaRegistryService;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -235,7 +234,7 @@ public class SchemasController extends AbstractController implements SchemasApi
|
|||
List<String> subjectsToRender = filteredSubjects.stream()
|
||||
.skip(subjectToSkip)
|
||||
.limit(pageSize)
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
|
||||
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
|
||||
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
|
||||
|
|
|
@ -22,6 +22,7 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
|
|||
import com.provectus.kafka.ui.model.TopicCreationDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
|
||||
import com.provectus.kafka.ui.model.TopicUpdateDTO;
|
||||
import com.provectus.kafka.ui.model.TopicsResponseDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
|
@ -143,7 +144,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.map(lst -> lst.stream()
|
||||
.map(InternalTopicConfig::from)
|
||||
.map(clusterMapper::toTopicConfig)
|
||||
.collect(toList()))
|
||||
.toList())
|
||||
.map(Flux::fromIterable)
|
||||
.map(ResponseEntity::ok)
|
||||
).doOnEach(sig -> audit(context, sig));
|
||||
|
@ -207,7 +208,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
return topicsService.loadTopics(getCluster(clusterName), topicsPage)
|
||||
.map(topicsToRender ->
|
||||
new TopicsResponseDTO()
|
||||
.topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
|
||||
.topics(topicsToRender.stream().map(clusterMapper::toTopic).toList())
|
||||
.pageCount(totalPages));
|
||||
})
|
||||
.map(ResponseEntity::ok)
|
||||
|
@ -327,6 +328,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
|
|||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
|
||||
String topicName,
|
||||
ServerWebExchange exchange) {
|
||||
var context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.topic(topicName)
|
||||
.topicActions(VIEW)
|
||||
.operationName("getActiveProducerStates")
|
||||
.build();
|
||||
|
||||
Comparator<TopicProducerStateDTO> ordering =
|
||||
Comparator.comparingInt(TopicProducerStateDTO::getPartition)
|
||||
.thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed());
|
||||
|
||||
Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
|
||||
.flatMapMany(statesMap ->
|
||||
Flux.fromStream(
|
||||
statesMap.entrySet().stream()
|
||||
.flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
|
||||
.sorted(ordering)));
|
||||
|
||||
return validateAccess(context)
|
||||
.thenReturn(states)
|
||||
.map(ResponseEntity::ok)
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
private Comparator<InternalTopic> getComparatorForTopic(
|
||||
TopicColumnsToSortDTO orderBy) {
|
||||
var defaultComparator = Comparator.comparing(InternalTopic::getName);
|
||||
|
|
|
@ -5,7 +5,6 @@ import java.util.Collection;
|
|||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.mutable.MutableLong;
|
||||
|
@ -28,7 +27,7 @@ class OffsetsInfo {
|
|||
this(consumer,
|
||||
consumer.partitionsFor(topic).stream()
|
||||
.map(pi -> new TopicPartition(topic, pi.partition()))
|
||||
.collect(Collectors.toList())
|
||||
.toList()
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
|
|||
err.setFieldName(e.getKey());
|
||||
err.setRestrictions(List.copyOf(e.getValue()));
|
||||
return err;
|
||||
}).collect(Collectors.toList());
|
||||
}).toList();
|
||||
|
||||
var message = fieldsErrors.isEmpty()
|
||||
? exception.getMessage()
|
||||
|
|
|
@ -30,11 +30,12 @@ import com.provectus.kafka.ui.model.ReplicaDTO;
|
|||
import com.provectus.kafka.ui.model.TopicConfigDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
|
||||
import com.provectus.kafka.ui.service.metrics.RawMetric;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.kafka.clients.admin.ConfigEntry;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.common.acl.AccessControlEntry;
|
||||
import org.apache.kafka.common.acl.AclBinding;
|
||||
import org.apache.kafka.common.acl.AclOperation;
|
||||
|
@ -54,7 +55,7 @@ public interface ClusterMapper {
|
|||
|
||||
default ClusterMetricsDTO toClusterMetrics(Metrics metrics) {
|
||||
return new ClusterMetricsDTO()
|
||||
.items(metrics.getSummarizedMetrics().map(this::convert).collect(Collectors.toList()));
|
||||
.items(metrics.getSummarizedMetrics().map(this::convert).toList());
|
||||
}
|
||||
|
||||
private MetricDTO convert(RawMetric rawMetric) {
|
||||
|
@ -66,7 +67,7 @@ public interface ClusterMapper {
|
|||
|
||||
default BrokerMetricsDTO toBrokerMetrics(List<RawMetric> metrics) {
|
||||
return new BrokerMetricsDTO()
|
||||
.metrics(metrics.stream().map(this::convert).collect(Collectors.toList()));
|
||||
.metrics(metrics.stream().map(this::convert).toList());
|
||||
}
|
||||
|
||||
@Mapping(target = "isSensitive", source = "sensitive")
|
||||
|
@ -107,7 +108,7 @@ public interface ClusterMapper {
|
|||
List<ClusterDTO.FeaturesEnum> toFeaturesEnum(List<ClusterFeature> features);
|
||||
|
||||
default List<PartitionDTO> map(Map<Integer, InternalPartition> map) {
|
||||
return map.values().stream().map(this::toPartition).collect(Collectors.toList());
|
||||
return map.values().stream().map(this::toPartition).toList();
|
||||
}
|
||||
|
||||
default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBrokerDiskUsage) {
|
||||
|
@ -118,6 +119,17 @@ public interface ClusterMapper {
|
|||
return brokerDiskUsage;
|
||||
}
|
||||
|
||||
default TopicProducerStateDTO map(int partition, ProducerState state) {
|
||||
return new TopicProducerStateDTO()
|
||||
.partition(partition)
|
||||
.producerId(state.producerId())
|
||||
.producerEpoch(state.producerEpoch())
|
||||
.lastSequence(state.lastSequence())
|
||||
.lastTimestampMs(state.lastTimestamp())
|
||||
.coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null))
|
||||
.currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null));
|
||||
}
|
||||
|
||||
static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) {
|
||||
return switch (operation) {
|
||||
case ALL -> KafkaAclDTO.OperationEnum.ALL;
|
||||
|
|
|
@ -21,7 +21,7 @@ public class DescribeLogDirsMapper {
|
|||
return logDirsInfo.entrySet().stream().map(
|
||||
mapEntry -> mapEntry.getValue().entrySet().stream()
|
||||
.map(e -> toBrokerLogDirs(mapEntry.getKey(), e.getKey(), e.getValue()))
|
||||
.collect(Collectors.toList())
|
||||
.toList()
|
||||
).flatMap(Collection::stream).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
@ -35,7 +35,7 @@ public class DescribeLogDirsMapper {
|
|||
var topics = logDirInfo.replicaInfos.entrySet().stream()
|
||||
.collect(Collectors.groupingBy(e -> e.getKey().topic())).entrySet().stream()
|
||||
.map(e -> toTopicLogDirs(broker, e.getKey(), e.getValue()))
|
||||
.collect(Collectors.toList());
|
||||
.toList();
|
||||
result.setTopics(topics);
|
||||
return result;
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ public class DescribeLogDirsMapper {
|
|||
topic.setPartitions(
|
||||
partitions.stream().map(
|
||||
e -> topicPartitionLogDir(
|
||||
broker, e.getKey().partition(), e.getValue())).collect(Collectors.toList())
|
||||
broker, e.getKey().partition(), e.getValue())).toList()
|
||||
);
|
||||
return topic;
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ public class InternalLogDirStats {
|
|||
topicMap.getValue().replicaInfos.entrySet().stream()
|
||||
.map(e -> Tuples.of(b.getKey(), e.getKey(), e.getValue().size))
|
||||
)
|
||||
).collect(toList());
|
||||
).toList();
|
||||
|
||||
partitionsStats = topicPartitions.stream().collect(
|
||||
groupingBy(
|
||||
|
|
|
@ -52,6 +52,8 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public static final class AccessContextBuilder {
|
||||
private static final String ACTIONS_NOT_PRESENT = "actions not present";
|
||||
|
||||
private Collection<ApplicationConfigAction> applicationConfigActions = Collections.emptySet();
|
||||
private String cluster;
|
||||
private Collection<ClusterConfigAction> clusterConfigActions = Collections.emptySet();
|
||||
|
@ -75,7 +77,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder applicationConfigActions(ApplicationConfigAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.applicationConfigActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -86,7 +88,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder clusterConfigActions(ClusterConfigAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.clusterConfigActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -97,7 +99,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder topicActions(TopicAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.topicActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -108,7 +110,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder consumerGroupActions(ConsumerGroupAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.consumerGroupActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -119,7 +121,7 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder connectActions(ConnectAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.connectActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
@ -135,25 +137,25 @@ public class AccessContext {
|
|||
}
|
||||
|
||||
public AccessContextBuilder schemaActions(SchemaAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.schemaActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder ksqlActions(KsqlAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.ksqlActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder aclActions(AclAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.aclActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder auditActions(AuditAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
Assert.isTrue(actions.length > 0, ACTIONS_NOT_PRESENT);
|
||||
this.auditActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@ import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
|||
import java.util.Base64;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
public class Base64Serde implements BuiltInSerde {
|
||||
|
||||
|
|
|
@ -28,6 +28,23 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
|
||||
private static final JsonMapper JSON_MAPPER = createMapper();
|
||||
|
||||
private static final String ASSIGNMENT = "assignment";
|
||||
private static final String CLIENT_HOST = "client_host";
|
||||
private static final String CLIENT_ID = "client_id";
|
||||
private static final String COMMIT_TIMESTAMP = "commit_timestamp";
|
||||
private static final String CURRENT_STATE_TIMESTAMP = "current_state_timestamp";
|
||||
private static final String GENERATION = "generation";
|
||||
private static final String LEADER = "leader";
|
||||
private static final String MEMBERS = "members";
|
||||
private static final String MEMBER_ID = "member_id";
|
||||
private static final String METADATA = "metadata";
|
||||
private static final String OFFSET = "offset";
|
||||
private static final String PROTOCOL = "protocol";
|
||||
private static final String PROTOCOL_TYPE = "protocol_type";
|
||||
private static final String REBALANCE_TIMEOUT = "rebalance_timeout";
|
||||
private static final String SESSION_TIMEOUT = "session_timeout";
|
||||
private static final String SUBSCRIPTION = "subscription";
|
||||
|
||||
public static final String TOPIC = "__consumer_offsets";
|
||||
|
||||
public static String name() {
|
||||
|
@ -116,128 +133,128 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
private Deserializer valueDeserializer() {
|
||||
final Schema commitOffsetSchemaV0 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV1 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
||||
new Field("expire_timestamp", Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV2 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV3 =
|
||||
new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field("leader_epoch", Type.INT32, ""),
|
||||
new Field("metadata", Type.STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, "")
|
||||
new Field(METADATA, Type.STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, "")
|
||||
);
|
||||
|
||||
final Schema commitOffsetSchemaV4 = new Schema(
|
||||
new Field("offset", Type.INT64, ""),
|
||||
new Field(OFFSET, Type.INT64, ""),
|
||||
new Field("leader_epoch", Type.INT32, ""),
|
||||
new Field("metadata", Type.COMPACT_STRING, ""),
|
||||
new Field("commit_timestamp", Type.INT64, ""),
|
||||
new Field(METADATA, Type.COMPACT_STRING, ""),
|
||||
new Field(COMMIT_TIMESTAMP, Type.INT64, ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
);
|
||||
|
||||
final Schema metadataSchema0 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema1 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema2 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema3 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new ArrayOf(new Schema(
|
||||
new Field("member_id", Type.STRING, ""),
|
||||
new Field(PROTOCOL_TYPE, Type.STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new ArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.STRING, ""),
|
||||
new Field("group_instance_id", Type.NULLABLE_STRING, ""),
|
||||
new Field("client_id", Type.STRING, ""),
|
||||
new Field("client_host", Type.STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.BYTES, ""),
|
||||
new Field("assignment", Type.BYTES, "")
|
||||
new Field(CLIENT_ID, Type.STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.BYTES, "")
|
||||
)), "")
|
||||
);
|
||||
|
||||
final Schema metadataSchema4 =
|
||||
new Schema(
|
||||
new Field("protocol_type", Type.COMPACT_STRING, ""),
|
||||
new Field("generation", Type.INT32, ""),
|
||||
new Field("protocol", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("leader", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("current_state_timestamp", Type.INT64, ""),
|
||||
new Field("members", new CompactArrayOf(new Schema(
|
||||
new Field("member_id", Type.COMPACT_STRING, ""),
|
||||
new Field(PROTOCOL_TYPE, Type.COMPACT_STRING, ""),
|
||||
new Field(GENERATION, Type.INT32, ""),
|
||||
new Field(PROTOCOL, Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field(LEADER, Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field(CURRENT_STATE_TIMESTAMP, Type.INT64, ""),
|
||||
new Field(MEMBERS, new CompactArrayOf(new Schema(
|
||||
new Field(MEMBER_ID, Type.COMPACT_STRING, ""),
|
||||
new Field("group_instance_id", Type.COMPACT_NULLABLE_STRING, ""),
|
||||
new Field("client_id", Type.COMPACT_STRING, ""),
|
||||
new Field("client_host", Type.COMPACT_STRING, ""),
|
||||
new Field("rebalance_timeout", Type.INT32, ""),
|
||||
new Field("session_timeout", Type.INT32, ""),
|
||||
new Field("subscription", Type.COMPACT_BYTES, ""),
|
||||
new Field("assignment", Type.COMPACT_BYTES, ""),
|
||||
new Field(CLIENT_ID, Type.COMPACT_STRING, ""),
|
||||
new Field(CLIENT_HOST, Type.COMPACT_STRING, ""),
|
||||
new Field(REBALANCE_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SESSION_TIMEOUT, Type.INT32, ""),
|
||||
new Field(SUBSCRIPTION, Type.COMPACT_BYTES, ""),
|
||||
new Field(ASSIGNMENT, Type.COMPACT_BYTES, ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
)), ""),
|
||||
Field.TaggedFieldsSection.of()
|
||||
|
@ -249,7 +266,7 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
|
|||
short version = bb.getShort();
|
||||
// ideally, we should distinguish if value is commit or metadata
|
||||
// by checking record's key, but our current serde structure doesn't allow that.
|
||||
// so, we trying to parse into metadata first and after into commit msg
|
||||
// so, we are trying to parse into metadata first and after into commit msg
|
||||
try {
|
||||
result = toJson(
|
||||
switch (version) {
|
||||
|
|
|
@ -2,7 +2,6 @@ package com.provectus.kafka.ui.serdes.builtin;
|
|||
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.PropertyResolver;
|
||||
import com.provectus.kafka.ui.serde.api.SchemaDescription;
|
||||
import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
||||
import java.util.Map;
|
||||
|
|
|
@ -1,46 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializer;
|
||||
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
class AvroSchemaRegistrySerializer extends SchemaRegistrySerializer<Object> {
|
||||
|
||||
AvroSchemaRegistrySerializer(String topic, boolean isKey,
|
||||
SchemaRegistryClient client,
|
||||
SchemaMetadata schema) {
|
||||
super(topic, isKey, client, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Serializer<Object> createSerializer(SchemaRegistryClient client) {
|
||||
var serializer = new KafkaAvroSerializer(client);
|
||||
serializer.configure(
|
||||
Map.of(
|
||||
"schema.registry.url", "wontbeused",
|
||||
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
|
||||
KafkaAvroSerializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true,
|
||||
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
|
||||
),
|
||||
isKey
|
||||
);
|
||||
return serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object serialize(String value, ParsedSchema schema) {
|
||||
try {
|
||||
return JsonAvroConversion.convertJsonToAvro(value, ((AvroSchema) schema).rawSchema());
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Failed to serialize record for topic " + topic, e);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,79 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
class JsonSchemaSchemaRegistrySerializer extends SchemaRegistrySerializer<JsonNode> {
|
||||
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
JsonSchemaSchemaRegistrySerializer(String topic,
|
||||
boolean isKey,
|
||||
SchemaRegistryClient client,
|
||||
SchemaMetadata schema) {
|
||||
super(topic, isKey, client, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Serializer<JsonNode> createSerializer(SchemaRegistryClient client) {
|
||||
var serializer = new KafkaJsonSchemaSerializerWithoutSchemaInfer(client);
|
||||
serializer.configure(
|
||||
Map.of(
|
||||
"schema.registry.url", "wontbeused",
|
||||
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
|
||||
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
|
||||
),
|
||||
isKey
|
||||
);
|
||||
return serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonNode serialize(String value, ParsedSchema schema) {
|
||||
try {
|
||||
JsonNode json = MAPPER.readTree(value);
|
||||
((JsonSchema) schema).validate(json);
|
||||
return json;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new ValidationException(String.format("'%s' is not valid json", value));
|
||||
} catch (org.everit.json.schema.ValidationException e) {
|
||||
throw new ValidationException(
|
||||
String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant
|
||||
private class KafkaJsonSchemaSerializerWithoutSchemaInfer
|
||||
extends KafkaJsonSchemaSerializer<JsonNode> {
|
||||
|
||||
KafkaJsonSchemaSerializerWithoutSchemaInfer(SchemaRegistryClient client) {
|
||||
super(client);
|
||||
}
|
||||
|
||||
/**
|
||||
* Need to override original method because it tries to infer schema from input
|
||||
* by checking 'schema' json field or @Schema annotation on input class, which is not
|
||||
* possible in our case. So, we just skip all infer logic and pass schema directly.
|
||||
*/
|
||||
@Override
|
||||
public byte[] serialize(String topic, JsonNode rec) {
|
||||
return super.serializeImpl(
|
||||
super.getSubjectName(topic, isKey, rec, schema),
|
||||
rec,
|
||||
(JsonSchema) schema
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
|
||||
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
|
||||
import java.util.Map;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
class ProtobufSchemaRegistrySerializer extends SchemaRegistrySerializer<Message> {
|
||||
|
||||
@SneakyThrows
|
||||
public ProtobufSchemaRegistrySerializer(String topic, boolean isKey,
|
||||
SchemaRegistryClient client, SchemaMetadata schema) {
|
||||
super(topic, isKey, client, schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Serializer<Message> createSerializer(SchemaRegistryClient client) {
|
||||
var serializer = new KafkaProtobufSerializer<>(client);
|
||||
serializer.configure(
|
||||
Map.of(
|
||||
"schema.registry.url", "wontbeused",
|
||||
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
|
||||
AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true
|
||||
),
|
||||
isKey
|
||||
);
|
||||
return serializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message serialize(String value, ParsedSchema schema) {
|
||||
ProtobufSchema protobufSchema = (ProtobufSchema) schema;
|
||||
DynamicMessage.Builder builder = protobufSchema.newMessageBuilder();
|
||||
try {
|
||||
JsonFormat.parser().merge(value, builder);
|
||||
return builder.build();
|
||||
} catch (Throwable e) {
|
||||
throw new RuntimeException("Failed to serialize record for topic " + topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,5 +1,8 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeAvro;
|
||||
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeJson;
|
||||
import static com.provectus.kafka.ui.serdes.builtin.sr.Serialize.serializeProto;
|
||||
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE;
|
||||
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG;
|
||||
|
||||
|
@ -7,7 +10,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.DeserializeResult;
|
||||
import com.provectus.kafka.ui.serde.api.PropertyResolver;
|
||||
import com.provectus.kafka.ui.serde.api.RecordHeaders;
|
||||
import com.provectus.kafka.ui.serde.api.SchemaDescription;
|
||||
import com.provectus.kafka.ui.serdes.BuiltInSerde;
|
||||
import com.provectus.kafka.ui.util.jsonschema.AvroJsonSchemaConverter;
|
||||
|
@ -32,17 +34,21 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
|
||||
|
||||
public class SchemaRegistrySerde implements BuiltInSerde {
|
||||
|
||||
private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
|
||||
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
|
||||
|
||||
public static String name() {
|
||||
return "SchemaRegistry";
|
||||
}
|
||||
|
||||
private static final String SCHEMA_REGISTRY = "schemaRegistry";
|
||||
|
||||
private SchemaRegistryClient schemaRegistryClient;
|
||||
private List<String> schemaRegistryUrls;
|
||||
private String valueSchemaNameTemplate;
|
||||
|
@ -54,7 +60,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public boolean canBeAutoConfigured(PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
return kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
|
||||
return kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class)
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.isPresent();
|
||||
}
|
||||
|
@ -62,7 +68,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public void autoConfigure(PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
var urls = kafkaClusterProperties.getListProperty("schemaRegistry", String.class)
|
||||
var urls = kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class)
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
|
||||
configure(
|
||||
|
@ -88,7 +94,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
PropertyResolver kafkaClusterProperties,
|
||||
PropertyResolver globalProperties) {
|
||||
var urls = serdeProperties.getListProperty("url", String.class)
|
||||
.or(() -> kafkaClusterProperties.getListProperty("schemaRegistry", String.class))
|
||||
.or(() -> kafkaClusterProperties.getListProperty(SCHEMA_REGISTRY, String.class))
|
||||
.filter(lst -> !lst.isEmpty())
|
||||
.orElseThrow(() -> new ValidationException("No urls provided for schema registry"));
|
||||
configure(
|
||||
|
@ -219,8 +225,8 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
.convert(basePath, ((AvroSchema) parsedSchema).rawSchema())
|
||||
.toJson();
|
||||
case JSON ->
|
||||
//need to use confluent JsonSchema since it includes resolved references
|
||||
((JsonSchema) parsedSchema).rawSchema().toString();
|
||||
//need to use confluent JsonSchema since it includes resolved references
|
||||
((JsonSchema) parsedSchema).rawSchema().toString();
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -252,35 +258,27 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
@Override
|
||||
public Serializer serializer(String topic, Target type) {
|
||||
String subject = schemaSubject(topic, type);
|
||||
var schema = getSchemaBySubject(subject)
|
||||
.orElseThrow(() -> new ValidationException(String.format("No schema for subject '%s' found", subject)));
|
||||
boolean isKey = type == Target.KEY;
|
||||
SchemaType schemaType = SchemaType.fromString(schema.getSchemaType())
|
||||
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + schema.getSchemaType()));
|
||||
SchemaMetadata meta = getSchemaBySubject(subject)
|
||||
.orElseThrow(() -> new ValidationException(
|
||||
String.format("No schema for subject '%s' found", subject)));
|
||||
ParsedSchema schema = getSchemaById(meta.getId())
|
||||
.orElseThrow(() -> new IllegalStateException(
|
||||
String.format("Schema found for id %s, subject '%s'", meta.getId(), subject)));
|
||||
SchemaType schemaType = SchemaType.fromString(meta.getSchemaType())
|
||||
.orElseThrow(() -> new IllegalStateException("Unknown schema type: " + meta.getSchemaType()));
|
||||
return switch (schemaType) {
|
||||
case PROTOBUF -> new ProtobufSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
|
||||
case AVRO -> new AvroSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
|
||||
case JSON -> new JsonSchemaSchemaRegistrySerializer(topic, isKey, schemaRegistryClient, schema);
|
||||
case PROTOBUF -> input ->
|
||||
serializeProto(schemaRegistryClient, topic, type, (ProtobufSchema) schema, meta.getId(), input);
|
||||
case AVRO -> input ->
|
||||
serializeAvro((AvroSchema) schema, meta.getId(), input);
|
||||
case JSON -> input ->
|
||||
serializeJson((JsonSchema) schema, meta.getId(), input);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Deserializer deserializer(String topic, Target type) {
|
||||
return new SrDeserializer(topic);
|
||||
}
|
||||
|
||||
///--------------------------------------------------------------
|
||||
|
||||
private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
|
||||
private static final int SR_RECORD_PREFIX_LENGTH = 5;
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private class SrDeserializer implements Deserializer {
|
||||
|
||||
private final String topic;
|
||||
|
||||
@Override
|
||||
public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
|
||||
return (headers, data) -> {
|
||||
var schemaId = extractSchemaIdFromMsg(data);
|
||||
SchemaType format = getMessageFormatBySchemaId(schemaId);
|
||||
MessageFormatter formatter = schemaRegistryFormatters.get(format);
|
||||
|
@ -292,7 +290,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
"type", format.name()
|
||||
)
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SchemaType getMessageFormatBySchemaId(int schemaId) {
|
||||
|
@ -304,7 +302,7 @@ public class SchemaRegistrySerde implements BuiltInSerde {
|
|||
|
||||
private int extractSchemaIdFromMsg(byte[] data) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||
if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
|
||||
if (buffer.remaining() >= SR_PAYLOAD_PREFIX_LENGTH && buffer.get() == SR_PAYLOAD_MAGIC_BYTE) {
|
||||
return buffer.getInt();
|
||||
}
|
||||
throw new ValidationException(
|
||||
|
|
|
@ -1,34 +0,0 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import io.confluent.kafka.schemaregistry.ParsedSchema;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
|
||||
abstract class SchemaRegistrySerializer<T> implements Serde.Serializer {
|
||||
protected final Serializer<T> serializer;
|
||||
protected final String topic;
|
||||
protected final boolean isKey;
|
||||
protected final ParsedSchema schema;
|
||||
|
||||
@SneakyThrows
|
||||
protected SchemaRegistrySerializer(String topic, boolean isKey, SchemaRegistryClient client,
|
||||
SchemaMetadata schema) {
|
||||
this.topic = topic;
|
||||
this.isKey = isKey;
|
||||
this.serializer = createSerializer(client);
|
||||
this.schema = client.getSchemaById(schema.getId());
|
||||
}
|
||||
|
||||
protected abstract Serializer<T> createSerializer(SchemaRegistryClient client);
|
||||
|
||||
@Override
|
||||
public byte[] serialize(String input) {
|
||||
final T read = this.serialize(input, schema);
|
||||
return this.serializer.serialize(topic, read);
|
||||
}
|
||||
|
||||
protected abstract T serialize(String value, ParsedSchema schema);
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
package com.provectus.kafka.ui.serdes.builtin.sr;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.DynamicMessage;
|
||||
import com.google.protobuf.Message;
|
||||
import com.google.protobuf.util.JsonFormat;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.serde.api.Serde;
|
||||
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
||||
import com.provectus.kafka.ui.util.jsonschema.JsonAvroConversion;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
|
||||
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
|
||||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
|
||||
import io.confluent.kafka.schemaregistry.json.JsonSchema;
|
||||
import io.confluent.kafka.schemaregistry.json.jackson.Jackson;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.MessageIndexes;
|
||||
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
|
||||
import io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufSerializer;
|
||||
import io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import lombok.SneakyThrows;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.io.BinaryEncoder;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
|
||||
final class Serialize {
|
||||
|
||||
private static final byte MAGIC = 0x0;
|
||||
private static final ObjectMapper JSON_SERIALIZE_MAPPER = Jackson.newObjectMapper(); //from confluent package
|
||||
|
||||
private Serialize() {
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant("AbstractKafkaJsonSchemaSerializer::serializeImpl")
|
||||
@SneakyThrows
|
||||
static byte[] serializeJson(JsonSchema schema, int schemaId, String value) {
|
||||
JsonNode json;
|
||||
try {
|
||||
json = JSON_SERIALIZE_MAPPER.readTree(value);
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new ValidationException(String.format("'%s' is not valid json", value));
|
||||
}
|
||||
try {
|
||||
schema.validate(json);
|
||||
} catch (org.everit.json.schema.ValidationException e) {
|
||||
throw new ValidationException(
|
||||
String.format("'%s' does not fit schema: %s", value, e.getAllMessages()));
|
||||
}
|
||||
try (var out = new ByteArrayOutputStream()) {
|
||||
out.write(MAGIC);
|
||||
out.write(schemaId(schemaId));
|
||||
out.write(JSON_SERIALIZE_MAPPER.writeValueAsBytes(json));
|
||||
return out.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant("AbstractKafkaProtobufSerializer::serializeImpl")
|
||||
@SneakyThrows
|
||||
static byte[] serializeProto(SchemaRegistryClient srClient,
|
||||
String topic,
|
||||
Serde.Target target,
|
||||
ProtobufSchema schema,
|
||||
int schemaId,
|
||||
String input) {
|
||||
// flags are tuned like in ProtobufSerializer by default
|
||||
boolean normalizeSchema = false;
|
||||
boolean autoRegisterSchema = false;
|
||||
boolean useLatestVersion = true;
|
||||
boolean latestCompatStrict = true;
|
||||
boolean skipKnownTypes = true;
|
||||
|
||||
schema = AbstractKafkaProtobufSerializer.resolveDependencies(
|
||||
srClient, normalizeSchema, autoRegisterSchema, useLatestVersion, latestCompatStrict,
|
||||
new HashMap<>(), skipKnownTypes, new DefaultReferenceSubjectNameStrategy(),
|
||||
topic, target == Serde.Target.KEY, schema
|
||||
);
|
||||
|
||||
DynamicMessage.Builder builder = schema.newMessageBuilder();
|
||||
JsonFormat.parser().merge(input, builder);
|
||||
Message message = builder.build();
|
||||
MessageIndexes indexes = schema.toMessageIndexes(message.getDescriptorForType().getFullName(), normalizeSchema);
|
||||
try (var out = new ByteArrayOutputStream()) {
|
||||
out.write(MAGIC);
|
||||
out.write(schemaId(schemaId));
|
||||
out.write(indexes.toByteArray());
|
||||
message.writeTo(out);
|
||||
return out.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
@KafkaClientInternalsDependant("AbstractKafkaAvroSerializer::serializeImpl")
|
||||
@SneakyThrows
|
||||
static byte[] serializeAvro(AvroSchema schema, int schemaId, String input) {
|
||||
var avroObject = JsonAvroConversion.convertJsonToAvro(input, schema.rawSchema());
|
||||
try (var out = new ByteArrayOutputStream()) {
|
||||
out.write(MAGIC);
|
||||
out.write(schemaId(schemaId));
|
||||
Schema rawSchema = schema.rawSchema();
|
||||
if (rawSchema.getType().equals(Schema.Type.BYTES)) {
|
||||
Preconditions.checkState(
|
||||
avroObject instanceof ByteBuffer,
|
||||
"Unrecognized bytes object of type: " + avroObject.getClass().getName()
|
||||
);
|
||||
out.write(((ByteBuffer) avroObject).array());
|
||||
} else {
|
||||
boolean useLogicalTypeConverters = true;
|
||||
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
|
||||
DatumWriter<Object> writer =
|
||||
(DatumWriter<Object>) AvroSchemaUtils.getDatumWriter(avroObject, rawSchema, useLogicalTypeConverters);
|
||||
writer.write(avroObject, encoder);
|
||||
encoder.flush();
|
||||
}
|
||||
return out.toByteArray();
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] schemaId(int id) {
|
||||
return ByteBuffer.allocate(Integer.BYTES).putInt(id).array();
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ import java.util.function.BiFunction;
|
|||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.AccessLevel;
|
||||
|
@ -55,6 +56,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
|||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.clients.admin.RecordsToDelete;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
|
@ -658,6 +660,21 @@ public class ReactiveAdminClient implements Closeable {
|
|||
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
|
||||
}
|
||||
|
||||
// returns tp -> list of active producer's states (if any)
|
||||
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
|
||||
return describeTopic(topic)
|
||||
.map(td -> client.describeProducers(
|
||||
IntStream.range(0, td.partitions().size())
|
||||
.mapToObj(i -> new TopicPartition(topic, i))
|
||||
.toList()
|
||||
).all()
|
||||
)
|
||||
.flatMap(ReactiveAdminClient::toMono)
|
||||
.map(map -> map.entrySet().stream()
|
||||
.filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers
|
||||
.collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers())));
|
||||
}
|
||||
|
||||
private Mono<Void> incrementalAlterConfig(String topicName,
|
||||
List<ConfigEntry> currentConfigs,
|
||||
Map<String, String> newConfigs) {
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
|
|||
import org.apache.kafka.clients.admin.NewPartitionReassignment;
|
||||
import org.apache.kafka.clients.admin.NewPartitions;
|
||||
import org.apache.kafka.clients.admin.OffsetSpec;
|
||||
import org.apache.kafka.clients.admin.ProducerState;
|
||||
import org.apache.kafka.clients.admin.TopicDescription;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
@ -459,6 +460,11 @@ public class TopicsService {
|
|||
);
|
||||
}
|
||||
|
||||
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(KafkaCluster cluster, String topic) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.getActiveProducersState(topic));
|
||||
}
|
||||
|
||||
private Mono<List<String>> filterExisting(KafkaCluster cluster, Collection<String> topics) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.listTopics(true))
|
||||
|
|
|
@ -6,7 +6,6 @@ import static com.provectus.kafka.ui.service.MessagesService.createProducer;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.provectus.kafka.ui.config.ClustersProperties;
|
||||
import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
|
||||
import com.provectus.kafka.ui.config.auth.RbacUser;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.service.AdminClientService;
|
||||
|
@ -21,6 +20,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
|
@ -28,7 +28,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.security.core.GrantedAuthority;
|
||||
import org.springframework.security.core.context.SecurityContext;
|
||||
import org.springframework.security.core.userdetails.UserDetails;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.publisher.Signal;
|
||||
|
@ -195,8 +197,11 @@ public class AuditService implements Closeable {
|
|||
if (sig.getContextView().hasKey(key)) {
|
||||
return sig.getContextView().<Mono<SecurityContext>>get(key)
|
||||
.map(context -> context.getAuthentication().getPrincipal())
|
||||
.cast(RbacUser.class)
|
||||
.map(user -> new AuthenticatedUser(user.name(), user.groups()))
|
||||
.cast(UserDetails.class)
|
||||
.map(user -> {
|
||||
var roles = user.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
|
||||
return new AuthenticatedUser(user.getUsername(), roles);
|
||||
})
|
||||
.switchIfEmpty(NO_AUTH_USER);
|
||||
} else {
|
||||
return NO_AUTH_USER;
|
||||
|
|
|
@ -11,6 +11,9 @@ import org.apache.kafka.common.Node;
|
|||
|
||||
class WellKnownMetrics {
|
||||
|
||||
private static final String BROKER_TOPIC_METRICS = "BrokerTopicMetrics";
|
||||
private static final String FIFTEEN_MINUTE_RATE = "FifteenMinuteRate";
|
||||
|
||||
// per broker
|
||||
final Map<Integer, BigDecimal> brokerBytesInFifteenMinuteRate = new HashMap<>();
|
||||
final Map<Integer, BigDecimal> brokerBytesOutFifteenMinuteRate = new HashMap<>();
|
||||
|
@ -36,15 +39,15 @@ class WellKnownMetrics {
|
|||
if (!brokerBytesInFifteenMinuteRate.containsKey(node.id())
|
||||
&& rawMetric.labels().size() == 1
|
||||
&& "BytesInPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
brokerBytesInFifteenMinuteRate.put(node.id(), rawMetric.value());
|
||||
}
|
||||
if (!brokerBytesOutFifteenMinuteRate.containsKey(node.id())
|
||||
&& rawMetric.labels().size() == 1
|
||||
&& "BytesOutPerSec".equalsIgnoreCase(rawMetric.labels().get("name"))
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
brokerBytesOutFifteenMinuteRate.put(node.id(), rawMetric.value());
|
||||
}
|
||||
}
|
||||
|
@ -53,8 +56,8 @@ class WellKnownMetrics {
|
|||
String name = rawMetric.name();
|
||||
String topic = rawMetric.labels().get("topic");
|
||||
if (topic != null
|
||||
&& containsIgnoreCase(name, "BrokerTopicMetrics")
|
||||
&& endsWithIgnoreCase(name, "FifteenMinuteRate")) {
|
||||
&& containsIgnoreCase(name, BROKER_TOPIC_METRICS)
|
||||
&& endsWithIgnoreCase(name, FIFTEEN_MINUTE_RATE)) {
|
||||
String nameProperty = rawMetric.labels().get("name");
|
||||
if ("BytesInPerSec".equalsIgnoreCase(nameProperty)) {
|
||||
bytesInFifteenMinuteRate.compute(topic, (k, v) -> v == null ? rawMetric.value() : v.add(rawMetric.value()));
|
||||
|
|
|
@ -52,6 +52,7 @@ import reactor.core.publisher.Mono;
|
|||
public class AccessControlService {
|
||||
|
||||
private static final String ACCESS_DENIED = "Access denied";
|
||||
private static final String ACTIONS_ARE_EMPTY = "actions are empty";
|
||||
|
||||
@Nullable
|
||||
private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
|
||||
|
@ -206,7 +207,7 @@ public class AccessControlService {
|
|||
if (context.getTopic() == null && context.getTopicActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getTopicActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getTopicActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getTopicActions()
|
||||
.stream()
|
||||
|
@ -243,7 +244,7 @@ public class AccessControlService {
|
|||
if (context.getConsumerGroup() == null && context.getConsumerGroupActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getConsumerGroupActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getConsumerGroupActions()
|
||||
.stream()
|
||||
|
@ -276,7 +277,7 @@ public class AccessControlService {
|
|||
if (context.getSchema() == null && context.getSchemaActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getSchemaActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getSchemaActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getSchemaActions()
|
||||
.stream()
|
||||
|
@ -309,7 +310,7 @@ public class AccessControlService {
|
|||
if (context.getConnect() == null && context.getConnectActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Assert.isTrue(!context.getConnectActions().isEmpty(), "actions are empty");
|
||||
Assert.isTrue(!context.getConnectActions().isEmpty(), ACTIONS_ARE_EMPTY);
|
||||
|
||||
Set<String> requiredActions = context.getConnectActions()
|
||||
.stream()
|
||||
|
|
|
@ -59,8 +59,8 @@ public class CognitoAuthorityExtractor implements ProviderAuthorityExtractor {
|
|||
.stream()
|
||||
.filter(s -> s.getProvider().equals(Provider.OAUTH_COGNITO))
|
||||
.filter(s -> s.getType().equals("group"))
|
||||
.anyMatch(subject -> Stream.of(groups)
|
||||
.map(Object::toString)
|
||||
.anyMatch(subject -> groups
|
||||
.stream()
|
||||
.anyMatch(cognitoGroup -> cognitoGroup.equals(subject.getValue()))
|
||||
))
|
||||
.map(Role::getName)
|
||||
|
|
|
@ -5,4 +5,5 @@ package com.provectus.kafka.ui.util.annotation;
|
|||
* should be marked with this annotation to make further update process easier.
|
||||
*/
|
||||
public @interface KafkaClientInternalsDependant {
|
||||
String value() default "";
|
||||
}
|
||||
|
|
|
@ -43,6 +43,8 @@ public class JsonAvroConversion {
|
|||
|
||||
private static final JsonMapper MAPPER = new JsonMapper();
|
||||
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
|
||||
private static final String FORMAT = "format";
|
||||
private static final String DATE_TIME = "date-time";
|
||||
|
||||
// converts json into Object that is expected input for KafkaAvroSerializer
|
||||
// (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
|
||||
|
@ -347,7 +349,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("uuid"))))
|
||||
Map.of(FORMAT, new TextNode("uuid"))))
|
||||
),
|
||||
|
||||
DECIMAL("decimal",
|
||||
|
@ -385,7 +387,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date"))))
|
||||
Map.of(FORMAT, new TextNode("date"))))
|
||||
),
|
||||
|
||||
TIME_MILLIS("time-millis",
|
||||
|
@ -406,7 +408,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("time"))))
|
||||
Map.of(FORMAT, new TextNode("time"))))
|
||||
),
|
||||
|
||||
TIME_MICROS("time-micros",
|
||||
|
@ -427,7 +429,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("time"))))
|
||||
Map.of(FORMAT, new TextNode("time"))))
|
||||
),
|
||||
|
||||
TIMESTAMP_MILLIS("timestamp-millis",
|
||||
|
@ -448,7 +450,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
),
|
||||
|
||||
TIMESTAMP_MICROS("timestamp-micros",
|
||||
|
@ -473,7 +475,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
),
|
||||
|
||||
LOCAL_TIMESTAMP_MILLIS("local-timestamp-millis",
|
||||
|
@ -491,7 +493,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
),
|
||||
|
||||
LOCAL_TIMESTAMP_MICROS("local-timestamp-micros",
|
||||
|
@ -508,7 +510,7 @@ public class JsonAvroConversion {
|
|||
new SimpleFieldSchema(
|
||||
new SimpleJsonType(
|
||||
JsonType.Type.STRING,
|
||||
Map.of("format", new TextNode("date-time"))))
|
||||
Map.of(FORMAT, new TextNode(DATE_TIME))))
|
||||
);
|
||||
|
||||
private final String name;
|
||||
|
|
|
@ -37,6 +37,9 @@ import reactor.util.function.Tuples;
|
|||
|
||||
public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.Descriptor> {
|
||||
|
||||
private static final String MAXIMUM = "maximum";
|
||||
private static final String MINIMUM = "minimum";
|
||||
|
||||
private final Set<String> simpleTypesWrapperNames = Set.of(
|
||||
BoolValue.getDescriptor().getFullName(),
|
||||
Int32Value.getDescriptor().getFullName(),
|
||||
|
@ -156,15 +159,15 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
|
|||
case INT32, FIXED32, SFIXED32, SINT32 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", IntNode.valueOf(Integer.MAX_VALUE),
|
||||
"minimum", IntNode.valueOf(Integer.MIN_VALUE)
|
||||
MAXIMUM, IntNode.valueOf(Integer.MAX_VALUE),
|
||||
MINIMUM, IntNode.valueOf(Integer.MIN_VALUE)
|
||||
)
|
||||
);
|
||||
case UINT32 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
|
||||
"minimum", IntNode.valueOf(0)
|
||||
MAXIMUM, LongNode.valueOf(UnsignedInteger.MAX_VALUE.longValue()),
|
||||
MINIMUM, IntNode.valueOf(0)
|
||||
)
|
||||
);
|
||||
//TODO: actually all *64 types will be printed with quotes (as strings),
|
||||
|
@ -173,15 +176,15 @@ public class ProtobufSchemaConverter implements JsonSchemaConverter<Descriptors.
|
|||
case INT64, FIXED64, SFIXED64, SINT64 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", LongNode.valueOf(Long.MAX_VALUE),
|
||||
"minimum", LongNode.valueOf(Long.MIN_VALUE)
|
||||
MAXIMUM, LongNode.valueOf(Long.MAX_VALUE),
|
||||
MINIMUM, LongNode.valueOf(Long.MIN_VALUE)
|
||||
)
|
||||
);
|
||||
case UINT64 -> new SimpleJsonType(
|
||||
JsonType.Type.INTEGER,
|
||||
Map.of(
|
||||
"maximum", new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
|
||||
"minimum", LongNode.valueOf(0)
|
||||
MAXIMUM, new BigIntegerNode(UnsignedLong.MAX_VALUE.bigIntegerValue()),
|
||||
MINIMUM, LongNode.valueOf(0)
|
||||
)
|
||||
);
|
||||
case MESSAGE, GROUP -> new SimpleJsonType(JsonType.Type.OBJECT);
|
||||
|
|
|
@ -763,6 +763,33 @@ paths:
|
|||
404:
|
||||
description: Not found
|
||||
|
||||
/api/clusters/{clusterName}/topics/{topicName}/activeproducers:
|
||||
get:
|
||||
tags:
|
||||
- Topics
|
||||
summary: get producer states for topic
|
||||
operationId: getActiveProducerStates
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
- name: topicName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/TopicProducerState'
|
||||
|
||||
/api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
|
||||
get:
|
||||
tags:
|
||||
|
@ -2619,6 +2646,31 @@ components:
|
|||
- PROTOBUF
|
||||
- UNKNOWN
|
||||
|
||||
TopicProducerState:
|
||||
type: object
|
||||
properties:
|
||||
partition:
|
||||
type: integer
|
||||
format: int32
|
||||
producerId:
|
||||
type: integer
|
||||
format: int64
|
||||
producerEpoch:
|
||||
type: integer
|
||||
format: int32
|
||||
lastSequence:
|
||||
type: integer
|
||||
format: int32
|
||||
lastTimestampMs:
|
||||
type: integer
|
||||
format: int64
|
||||
coordinatorEpoch:
|
||||
type: integer
|
||||
format: int32
|
||||
currentTransactionStartOffset:
|
||||
type: integer
|
||||
format: int64
|
||||
|
||||
ConsumerGroup:
|
||||
discriminator:
|
||||
propertyName: inherit
|
||||
|
|
|
@ -10,25 +10,27 @@ import lombok.experimental.Accessors;
|
|||
@Accessors(chain = true)
|
||||
public class Schema {
|
||||
|
||||
private static final String USER_DIR = "user.dir";
|
||||
|
||||
private String name, valuePath;
|
||||
private SchemaType type;
|
||||
|
||||
public static Schema createSchemaAvro() {
|
||||
return new Schema().setName("schema_avro-" + randomAlphabetic(5))
|
||||
.setType(SchemaType.AVRO)
|
||||
.setValuePath(System.getProperty("user.dir") + "/src/main/resources/testData/schemas/schema_avro_value.json");
|
||||
.setValuePath(System.getProperty(USER_DIR) + "/src/main/resources/testData/schemas/schema_avro_value.json");
|
||||
}
|
||||
|
||||
public static Schema createSchemaJson() {
|
||||
return new Schema().setName("schema_json-" + randomAlphabetic(5))
|
||||
.setType(SchemaType.JSON)
|
||||
.setValuePath(System.getProperty("user.dir") + "/src/main/resources/testData/schemas/schema_json_Value.json");
|
||||
.setValuePath(System.getProperty(USER_DIR) + "/src/main/resources/testData/schemas/schema_json_Value.json");
|
||||
}
|
||||
|
||||
public static Schema createSchemaProtobuf() {
|
||||
return new Schema().setName("schema_protobuf-" + randomAlphabetic(5))
|
||||
.setType(SchemaType.PROTOBUF)
|
||||
.setValuePath(
|
||||
System.getProperty("user.dir") + "/src/main/resources/testData/schemas/schema_protobuf_value.txt");
|
||||
System.getProperty(USER_DIR) + "/src/main/resources/testData/schemas/schema_protobuf_value.txt");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@ import java.util.stream.Stream;
|
|||
|
||||
public class BrokersConfigTab extends BasePage {
|
||||
|
||||
protected List<SelenideElement> editBtn = $$x("//button[@aria-label='editAction']");
|
||||
protected SelenideElement searchByKeyField = $x("//input[@placeholder='Search by Key or Value']");
|
||||
protected SelenideElement sourceInfoIcon = $x("//div[text()='Source']/..//div/div[@class]");
|
||||
protected SelenideElement sourceInfoTooltip = $x("//div[text()='Source']/..//div/div[@style]");
|
||||
protected ElementsCollection editBtns = $$x("//button[@aria-label='editAction']");
|
||||
|
|
|
@ -19,6 +19,8 @@ import io.qameta.allure.Step;
|
|||
|
||||
public class TopicCreateEditForm extends BasePage {
|
||||
|
||||
private static final String RETENTION_BYTES = "retentionBytes";
|
||||
|
||||
protected SelenideElement timeToRetainField = $x("//input[@id='timeToRetain']");
|
||||
protected SelenideElement partitionsField = $x("//input[@name='partitions']");
|
||||
protected SelenideElement nameField = $(id("topicFormName"));
|
||||
|
@ -138,12 +140,12 @@ public class TopicCreateEditForm extends BasePage {
|
|||
|
||||
@Step
|
||||
public TopicCreateEditForm selectRetentionBytes(String visibleValue) {
|
||||
return selectFromDropDownByVisibleText("retentionBytes", visibleValue);
|
||||
return selectFromDropDownByVisibleText(RETENTION_BYTES, visibleValue);
|
||||
}
|
||||
|
||||
@Step
|
||||
public TopicCreateEditForm selectRetentionBytes(Long optionValue) {
|
||||
return selectFromDropDownByOptionValue("retentionBytes", optionValue.toString());
|
||||
return selectFromDropDownByOptionValue(RETENTION_BYTES, optionValue.toString());
|
||||
}
|
||||
|
||||
@Step
|
||||
|
@ -202,7 +204,7 @@ public class TopicCreateEditForm extends BasePage {
|
|||
|
||||
@Step
|
||||
public String getMaxSizeOnDisk() {
|
||||
return new KafkaUiSelectElement("retentionBytes").getCurrentValue();
|
||||
return new KafkaUiSelectElement(RETENTION_BYTES).getCurrentValue();
|
||||
}
|
||||
|
||||
@Step
|
||||
|
|
|
@ -1 +1 @@
|
|||
v16.15.0
|
||||
v18.17.1
|
||||
|
|
|
@ -106,7 +106,7 @@
|
|||
"vite-plugin-ejs": "^1.6.4"
|
||||
},
|
||||
"engines": {
|
||||
"node": "v16.15.0",
|
||||
"pnpm": "^7.4.0"
|
||||
"node": "v18.17.1",
|
||||
"pnpm": "^8.6.12"
|
||||
}
|
||||
}
|
||||
|
|
3210
kafka-ui-react-app/pnpm-lock.yaml
generated
3210
kafka-ui-react-app/pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load diff
|
@ -19,6 +19,8 @@ import BrokerLogdir from 'components/Brokers/Broker/BrokerLogdir/BrokerLogdir';
|
|||
import BrokerMetrics from 'components/Brokers/Broker/BrokerMetrics/BrokerMetrics';
|
||||
import Navbar from 'components/common/Navigation/Navbar.styled';
|
||||
import PageLoader from 'components/common/PageLoader/PageLoader';
|
||||
import { ActionNavLink } from 'components/common/ActionComponent';
|
||||
import { Action, ResourceType } from 'generated-sources';
|
||||
|
||||
import Configs from './Configs/Configs';
|
||||
|
||||
|
@ -71,12 +73,16 @@ const Broker: React.FC = () => {
|
|||
>
|
||||
Configs
|
||||
</NavLink>
|
||||
<NavLink
|
||||
<ActionNavLink
|
||||
to={clusterBrokerMetricsPath(clusterName, brokerId)}
|
||||
className={({ isActive }) => (isActive ? 'is-active' : '')}
|
||||
permission={{
|
||||
resource: ResourceType.CLUSTERCONFIG,
|
||||
action: Action.VIEW,
|
||||
}}
|
||||
>
|
||||
Metrics
|
||||
</NavLink>
|
||||
</ActionNavLink>
|
||||
</Navbar>
|
||||
<Suspense fallback={<PageLoader />}>
|
||||
<Routes>
|
||||
|
|
|
@ -34,14 +34,19 @@ const Configs: React.FC = () => {
|
|||
|
||||
const getData = () => {
|
||||
return data
|
||||
.filter(
|
||||
(item) =>
|
||||
item.name.toLocaleLowerCase().indexOf(keyword.toLocaleLowerCase()) >
|
||||
-1
|
||||
)
|
||||
.filter((item) => {
|
||||
const nameMatch = item.name
|
||||
.toLocaleLowerCase()
|
||||
.includes(keyword.toLocaleLowerCase());
|
||||
return nameMatch
|
||||
? true
|
||||
: item.value &&
|
||||
item.value
|
||||
.toLocaleLowerCase()
|
||||
.includes(keyword.toLocaleLowerCase()); // try to match the keyword on any of the item.value elements when nameMatch fails but item.value exists
|
||||
})
|
||||
.sort((a, b) => {
|
||||
if (a.source === b.source) return 0;
|
||||
|
||||
return a.source === ConfigSource.DYNAMIC_BROKER_CONFIG ? -1 : 1;
|
||||
});
|
||||
};
|
||||
|
@ -95,7 +100,7 @@ const Configs: React.FC = () => {
|
|||
<S.SearchWrapper>
|
||||
<Search
|
||||
onChange={setKeyword}
|
||||
placeholder="Search by Key"
|
||||
placeholder="Search by Key or Value"
|
||||
value={keyword}
|
||||
/>
|
||||
</S.SearchWrapper>
|
||||
|
|
|
@ -13,7 +13,7 @@ import { brokersPayload } from 'lib/fixtures/brokers';
|
|||
import { clusterStatsPayload } from 'lib/fixtures/clusters';
|
||||
|
||||
const clusterName = 'local';
|
||||
const brokerId = 1;
|
||||
const brokerId = 200;
|
||||
const activeClassName = 'is-active';
|
||||
const brokerLogdir = {
|
||||
pageText: 'brokerLogdir',
|
||||
|
|
|
@ -73,13 +73,13 @@ const BrokersList: React.FC = () => {
|
|||
header: 'Broker ID',
|
||||
accessorKey: 'brokerId',
|
||||
// eslint-disable-next-line react/no-unstable-nested-components
|
||||
cell: ({ row: { id }, getValue }) => (
|
||||
cell: ({ getValue }) => (
|
||||
<S.RowCell>
|
||||
<LinkCell
|
||||
value={`${getValue<string | number>()}`}
|
||||
to={encodeURIComponent(`${getValue<string | number>()}`)}
|
||||
/>
|
||||
{id === String(activeControllers) && (
|
||||
{getValue<string | number>() === activeControllers && (
|
||||
<Tooltip
|
||||
value={<CheckMarkRoundIcon />}
|
||||
content="Active Controller"
|
||||
|
|
|
@ -56,11 +56,11 @@ describe('BrokersList Component', () => {
|
|||
});
|
||||
it('opens broker when row clicked', async () => {
|
||||
renderComponent();
|
||||
await userEvent.click(screen.getByRole('cell', { name: '0' }));
|
||||
await userEvent.click(screen.getByRole('cell', { name: '100' }));
|
||||
|
||||
await waitFor(() =>
|
||||
expect(mockedUsedNavigate).toBeCalledWith(
|
||||
clusterBrokerPath(clusterName, '0')
|
||||
clusterBrokerPath(clusterName, '100')
|
||||
)
|
||||
);
|
||||
});
|
||||
|
@ -124,6 +124,39 @@ describe('BrokersList Component', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('BrokersList', () => {
|
||||
describe('when the brokers are loaded', () => {
|
||||
const testActiveControllers = 0;
|
||||
beforeEach(() => {
|
||||
(useBrokers as jest.Mock).mockImplementation(() => ({
|
||||
data: brokersPayload,
|
||||
}));
|
||||
(useClusterStats as jest.Mock).mockImplementation(() => ({
|
||||
data: clusterStatsPayload,
|
||||
}));
|
||||
});
|
||||
|
||||
it(`Indicates correct active cluster`, async () => {
|
||||
renderComponent();
|
||||
await waitFor(() =>
|
||||
expect(screen.getByRole('tooltip')).toBeInTheDocument()
|
||||
);
|
||||
});
|
||||
it(`Correct display even if there is no active cluster: ${testActiveControllers} `, async () => {
|
||||
(useClusterStats as jest.Mock).mockImplementation(() => ({
|
||||
data: {
|
||||
...clusterStatsPayload,
|
||||
activeControllers: testActiveControllers,
|
||||
},
|
||||
}));
|
||||
renderComponent();
|
||||
await waitFor(() =>
|
||||
expect(screen.queryByRole('tooltip')).not.toBeInTheDocument()
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('when diskUsage is empty', () => {
|
||||
beforeEach(() => {
|
||||
(useBrokers as jest.Mock).mockImplementation(() => ({
|
||||
|
@ -157,11 +190,11 @@ describe('BrokersList Component', () => {
|
|||
});
|
||||
it('opens broker when row clicked', async () => {
|
||||
renderComponent();
|
||||
await userEvent.click(screen.getByRole('cell', { name: '1' }));
|
||||
await userEvent.click(screen.getByRole('cell', { name: '100' }));
|
||||
|
||||
await waitFor(() =>
|
||||
expect(mockedUsedNavigate).toBeCalledWith(
|
||||
clusterBrokerPath(clusterName, '1')
|
||||
clusterBrokerPath(clusterName, '100')
|
||||
)
|
||||
);
|
||||
});
|
||||
|
|
|
@ -15,7 +15,7 @@ enum Filters {
|
|||
PARTITION_COUNT = 'partitionCount',
|
||||
REPLICATION_FACTOR = 'replicationFactor',
|
||||
INSYNC_REPLICAS = 'inSyncReplicas',
|
||||
CLEANUP_POLICY = 'Delete',
|
||||
CLEANUP_POLICY = 'cleanUpPolicy',
|
||||
}
|
||||
|
||||
const New: React.FC = () => {
|
||||
|
|
|
@ -60,16 +60,16 @@ describe('New', () => {
|
|||
await userEvent.clear(screen.getByPlaceholderText('Topic Name'));
|
||||
await userEvent.tab();
|
||||
await expect(
|
||||
screen.getByText('name is a required field')
|
||||
screen.getByText('Topic Name is required')
|
||||
).toBeInTheDocument();
|
||||
await userEvent.type(
|
||||
screen.getByLabelText('Number of partitions *'),
|
||||
screen.getByLabelText('Number of Partitions *'),
|
||||
minValue
|
||||
);
|
||||
await userEvent.clear(screen.getByLabelText('Number of partitions *'));
|
||||
await userEvent.clear(screen.getByLabelText('Number of Partitions *'));
|
||||
await userEvent.tab();
|
||||
await expect(
|
||||
screen.getByText('Number of partitions is required and must be a number')
|
||||
screen.getByText('Number of Partitions is required and must be a number')
|
||||
).toBeInTheDocument();
|
||||
|
||||
expect(createTopicMock).not.toHaveBeenCalled();
|
||||
|
@ -89,7 +89,7 @@ describe('New', () => {
|
|||
renderComponent(clusterTopicNewPath(clusterName));
|
||||
await userEvent.type(screen.getByPlaceholderText('Topic Name'), topicName);
|
||||
await userEvent.type(
|
||||
screen.getByLabelText('Number of partitions *'),
|
||||
screen.getByLabelText('Number of Partitions *'),
|
||||
minValue
|
||||
);
|
||||
await userEvent.click(screen.getByText('Create topic'));
|
||||
|
|
|
@ -44,9 +44,11 @@ const Metrics: React.FC = () => {
|
|||
if (data.progress) {
|
||||
return (
|
||||
<S.ProgressContainer>
|
||||
<S.ProgressPct>
|
||||
{Math.floor(data.progress.completenessPercent || 0)}%
|
||||
</S.ProgressPct>
|
||||
<S.ProgressBarWrapper>
|
||||
<ProgressBar completed={data.progress.completenessPercent || 0} />
|
||||
<span> {Math.floor(data.progress.completenessPercent || 0)} %</span>
|
||||
</S.ProgressBarWrapper>
|
||||
<ActionButton
|
||||
onClick={async () => {
|
||||
|
|
|
@ -42,3 +42,10 @@ export const ProgressBarWrapper = styled.div`
|
|||
align-items: center;
|
||||
width: 280px;
|
||||
`;
|
||||
|
||||
export const ProgressPct = styled.span`
|
||||
font-size: 15px;
|
||||
font-weight: bold;
|
||||
line-height: 1.5;
|
||||
color: ${({ theme }) => theme.statictics.progressPctColor};
|
||||
`;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import styled from 'styled-components';
|
||||
import Input from 'components/common/Input/Input';
|
||||
|
||||
export const Column = styled.div`
|
||||
display: flex;
|
||||
|
@ -16,6 +17,10 @@ export const CustomParamsHeading = styled.h4`
|
|||
color: ${({ theme }) => theme.heading.h4};
|
||||
`;
|
||||
|
||||
export const MessageSizeInput = styled(Input)`
|
||||
min-width: 195px;
|
||||
`;
|
||||
|
||||
export const Label = styled.div`
|
||||
display: flex;
|
||||
gap: 16px;
|
||||
|
|
|
@ -109,12 +109,12 @@ const TopicForm: React.FC<Props> = ({
|
|||
{!isEditing && (
|
||||
<div>
|
||||
<InputLabel htmlFor="topicFormNumberOfPartitions">
|
||||
Number of partitions *
|
||||
Number of Partitions *
|
||||
</InputLabel>
|
||||
<Input
|
||||
id="topicFormNumberOfPartitions"
|
||||
type="number"
|
||||
placeholder="Number of partitions"
|
||||
placeholder="Number of Partitions"
|
||||
min="1"
|
||||
name="partitions"
|
||||
positiveOnly
|
||||
|
@ -228,7 +228,7 @@ const TopicForm: React.FC<Props> = ({
|
|||
<InputLabel htmlFor="topicFormMaxMessageBytes">
|
||||
Maximum message size in bytes
|
||||
</InputLabel>
|
||||
<Input
|
||||
<S.MessageSizeInput
|
||||
id="topicFormMaxMessageBytes"
|
||||
type="number"
|
||||
placeholder="Maximum message size"
|
||||
|
|
|
@ -37,7 +37,7 @@ describe('TopicForm', () => {
|
|||
|
||||
expectByRoleAndNameToBeInDocument('textbox', 'Topic Name *');
|
||||
|
||||
expectByRoleAndNameToBeInDocument('spinbutton', 'Number of partitions *');
|
||||
expectByRoleAndNameToBeInDocument('spinbutton', 'Number of Partitions *');
|
||||
expectByRoleAndNameToBeInDocument('spinbutton', 'Replication Factor');
|
||||
|
||||
expectByRoleAndNameToBeInDocument('spinbutton', 'Min In Sync Replicas');
|
||||
|
|
|
@ -7,11 +7,13 @@ export const clusterName = 'local';
|
|||
export const validPermission = {
|
||||
resource: ResourceType.TOPIC,
|
||||
action: Action.CREATE,
|
||||
value: 'topic',
|
||||
};
|
||||
|
||||
export const invalidPermission = {
|
||||
resource: ResourceType.SCHEMA,
|
||||
action: Action.DELETE,
|
||||
value: 'test',
|
||||
};
|
||||
|
||||
const roles = [
|
||||
|
|
|
@ -7,6 +7,7 @@ const CheckMarkRoundIcon: React.FC = () => {
|
|||
height="14"
|
||||
viewBox="0 0 14 14"
|
||||
fill="none"
|
||||
role="tooltip"
|
||||
xmlns="http://www.w3.org/2000/svg"
|
||||
>
|
||||
<path
|
||||
|
|
|
@ -14,6 +14,7 @@ describe('Permission Helpers', () => {
|
|||
clusters: [clusterName1],
|
||||
resource: ResourceType.TOPIC,
|
||||
actions: [Action.VIEW, Action.CREATE],
|
||||
value: '.*',
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1],
|
||||
|
@ -24,11 +25,18 @@ describe('Permission Helpers', () => {
|
|||
clusters: [clusterName1, clusterName2],
|
||||
resource: ResourceType.SCHEMA,
|
||||
actions: [Action.VIEW],
|
||||
value: '.*',
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1, clusterName2],
|
||||
resource: ResourceType.CONNECT,
|
||||
actions: [Action.VIEW],
|
||||
value: '.*',
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1],
|
||||
resource: ResourceType.APPLICATIONCONFIG,
|
||||
actions: [Action.EDIT],
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1],
|
||||
|
@ -39,6 +47,7 @@ describe('Permission Helpers', () => {
|
|||
clusters: [clusterName1],
|
||||
resource: ResourceType.CONSUMER,
|
||||
actions: [Action.DELETE],
|
||||
value: '.*',
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1],
|
||||
|
@ -46,6 +55,16 @@ describe('Permission Helpers', () => {
|
|||
actions: [Action.EDIT, Action.DELETE, Action.CREATE],
|
||||
value: '123.*',
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1],
|
||||
resource: ResourceType.ACL,
|
||||
actions: [Action.VIEW],
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1],
|
||||
resource: ResourceType.AUDIT,
|
||||
actions: [Action.VIEW],
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1, clusterName2],
|
||||
resource: ResourceType.TOPIC,
|
||||
|
@ -58,6 +77,12 @@ describe('Permission Helpers', () => {
|
|||
value: '.*',
|
||||
actions: [Action.EDIT, Action.DELETE],
|
||||
},
|
||||
{
|
||||
clusters: [clusterName1, clusterName2],
|
||||
resource: ResourceType.TOPIC,
|
||||
value: 'bobross.*',
|
||||
actions: [Action.VIEW, Action.MESSAGES_READ],
|
||||
},
|
||||
];
|
||||
|
||||
const roles = modifyRolesData(userPermissionsMock);
|
||||
|
@ -100,11 +125,11 @@ describe('Permission Helpers', () => {
|
|||
|
||||
expect(result.size).toBe(2);
|
||||
|
||||
expect(cluster1Map?.size).toBe(6);
|
||||
expect(cluster1Map?.size).toBe(9);
|
||||
expect(cluster2Map?.size).toBe(3);
|
||||
|
||||
// clusterMap1
|
||||
expect(cluster1Map?.get(ResourceType.TOPIC)).toHaveLength(3);
|
||||
expect(cluster1Map?.get(ResourceType.TOPIC)).toHaveLength(4);
|
||||
expect(cluster1Map?.get(ResourceType.SCHEMA)).toHaveLength(2);
|
||||
expect(cluster1Map?.get(ResourceType.CONSUMER)).toHaveLength(1);
|
||||
expect(cluster1Map?.get(ResourceType.CLUSTERCONFIG)).toHaveLength(1);
|
||||
|
@ -177,33 +202,13 @@ describe('Permission Helpers', () => {
|
|||
).toBeFalsy();
|
||||
});
|
||||
|
||||
it('should check if the isPermitted returns the correct value without name values', () => {
|
||||
it('should check if the isPermitted returns the correct value without resource values (exempt list)', () => {
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.TOPIC,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName2,
|
||||
resource: ResourceType.TOPIC,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeFalsy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.SCHEMA,
|
||||
action: Action.VIEW,
|
||||
resource: ResourceType.KSQL,
|
||||
action: Action.EXECUTE,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
@ -222,8 +227,8 @@ describe('Permission Helpers', () => {
|
|||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.KSQL,
|
||||
action: Action.EXECUTE,
|
||||
resource: ResourceType.APPLICATIONCONFIG,
|
||||
action: Action.EDIT,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
@ -231,23 +236,33 @@ describe('Permission Helpers', () => {
|
|||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName2,
|
||||
resource: ResourceType.KSQL,
|
||||
action: Action.EXECUTE,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.ACL,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.AUDIT,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.TOPIC,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeFalsy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName2,
|
||||
resource: ResourceType.SCHEMA,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
|
@ -256,17 +271,17 @@ describe('Permission Helpers', () => {
|
|||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
).toBeFalsy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName2,
|
||||
resource: ResourceType.CONNECT,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.CONSUMER,
|
||||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
).toBeFalsy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
|
@ -276,7 +291,7 @@ describe('Permission Helpers', () => {
|
|||
action: Action.VIEW,
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
).toBeFalsy();
|
||||
});
|
||||
|
||||
it('should check if the isPermitted returns the correct value with name values', () => {
|
||||
|
@ -445,7 +460,7 @@ describe('Permission Helpers', () => {
|
|||
value: '123456',
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeFalsy();
|
||||
).toBeTruthy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
|
@ -468,6 +483,17 @@ describe('Permission Helpers', () => {
|
|||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
|
||||
expect(
|
||||
isPermitted({
|
||||
roles,
|
||||
clusterName: clusterName1,
|
||||
resource: ResourceType.TOPIC,
|
||||
action: [Action.MESSAGES_READ],
|
||||
value: 'bobross-test',
|
||||
rbacFlag: true,
|
||||
})
|
||||
).toBeTruthy();
|
||||
});
|
||||
|
||||
it('should check the rbac flag and works with permissions accordingly', () => {
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import { BrokerConfig, BrokersLogdirs, ConfigSource } from 'generated-sources';
|
||||
|
||||
export const brokersPayload = [
|
||||
{ id: 1, host: 'b-1.test.kafka.amazonaws.com', port: 9092 },
|
||||
{ id: 2, host: 'b-2.test.kafka.amazonaws.com', port: 9092 },
|
||||
{ id: 100, host: 'b-1.test.kafka.amazonaws.com', port: 9092 },
|
||||
{ id: 200, host: 'b-2.test.kafka.amazonaws.com', port: 9092 },
|
||||
];
|
||||
|
||||
const partition = {
|
||||
|
|
|
@ -32,15 +32,15 @@ export const clustersPayload: Cluster[] = [
|
|||
|
||||
export const clusterStatsPayload = {
|
||||
brokerCount: 2,
|
||||
activeControllers: 1,
|
||||
activeControllers: 100,
|
||||
onlinePartitionCount: 138,
|
||||
offlinePartitionCount: 0,
|
||||
inSyncReplicasCount: 239,
|
||||
outOfSyncReplicasCount: 0,
|
||||
underReplicatedPartitionCount: 0,
|
||||
diskUsage: [
|
||||
{ brokerId: 0, segmentSize: 334567, segmentCount: 245 },
|
||||
{ brokerId: 1, segmentSize: 12345678, segmentCount: 121 },
|
||||
{ brokerId: 100, segmentSize: 334567, segmentCount: 245 },
|
||||
{ brokerId: 200, segmentSize: 12345678, segmentCount: 121 },
|
||||
],
|
||||
version: '2.2.1',
|
||||
};
|
||||
|
|
|
@ -76,7 +76,6 @@ const formatTopicCreation = (form: TopicFormData): TopicCreation => {
|
|||
partitions,
|
||||
replicationFactor,
|
||||
cleanupPolicy,
|
||||
retentionBytes,
|
||||
retentionMs,
|
||||
maxMessageBytes,
|
||||
minInSyncReplicas,
|
||||
|
@ -86,7 +85,6 @@ const formatTopicCreation = (form: TopicFormData): TopicCreation => {
|
|||
const configs = {
|
||||
'cleanup.policy': cleanupPolicy,
|
||||
'retention.ms': retentionMs.toString(),
|
||||
'retention.bytes': retentionBytes.toString(),
|
||||
'max.message.bytes': maxMessageBytes.toString(),
|
||||
'min.insync.replicas': minInSyncReplicas.toString(),
|
||||
...Object.values(customParams || {}).reduce(topicReducer, {}),
|
||||
|
|
|
@ -1,9 +1,17 @@
|
|||
import { Action, UserPermission, ResourceType } from 'generated-sources';
|
||||
import { Action, ResourceType, UserPermission } from 'generated-sources';
|
||||
|
||||
export type RolesType = UserPermission[];
|
||||
|
||||
export type RolesModifiedTypes = Map<string, Map<ResourceType, RolesType>>;
|
||||
|
||||
const ResourceExemptList: ResourceType[] = [
|
||||
ResourceType.KSQL,
|
||||
ResourceType.CLUSTERCONFIG,
|
||||
ResourceType.APPLICATIONCONFIG,
|
||||
ResourceType.ACL,
|
||||
ResourceType.AUDIT,
|
||||
];
|
||||
|
||||
export function modifyRolesData(
|
||||
data?: RolesType
|
||||
): Map<string, Map<ResourceType, RolesType>> {
|
||||
|
@ -39,6 +47,12 @@ interface IsPermittedConfig {
|
|||
rbacFlag: boolean;
|
||||
}
|
||||
|
||||
const valueMatches = (regexp: string | undefined, val: string | undefined) => {
|
||||
if (!val) return false;
|
||||
if (!regexp) return true;
|
||||
return new RegExp(regexp).test(val);
|
||||
};
|
||||
|
||||
/**
|
||||
* @description it the logic behind depending on the roles whether a certain action
|
||||
* is permitted or not the philosophy is inspired from Headless UI libraries where
|
||||
|
@ -83,32 +97,18 @@ export function isPermitted({
|
|||
if (!clusterMap) return false;
|
||||
|
||||
// short circuit
|
||||
const resourceData = clusterMap.get(resource);
|
||||
if (!resourceData) return false;
|
||||
const resourcePermissions = clusterMap.get(resource);
|
||||
if (!resourcePermissions) return false;
|
||||
|
||||
return (
|
||||
resourceData.findIndex((item) => {
|
||||
let valueCheck = true;
|
||||
if (item.value) {
|
||||
valueCheck = false;
|
||||
const actions = Array.isArray(action) ? action : [action];
|
||||
|
||||
if (value) valueCheck = new RegExp(item.value).test(value);
|
||||
}
|
||||
|
||||
// short circuit
|
||||
if (!valueCheck) return false;
|
||||
|
||||
if (!Array.isArray(action)) {
|
||||
return item.actions.includes(action);
|
||||
}
|
||||
|
||||
// every given action should be found in that resource
|
||||
return action.every(
|
||||
(currentAction) =>
|
||||
item.actions.findIndex((element) => element === currentAction) !== -1
|
||||
);
|
||||
}) !== -1
|
||||
);
|
||||
return actions.every((a) => {
|
||||
return resourcePermissions.some((item) => {
|
||||
if (!item.actions.includes(a)) return false;
|
||||
if (ResourceExemptList.includes(resource)) return true;
|
||||
return valueMatches(item.value, value);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -66,17 +66,17 @@ export const topicFormValidationSchema = yup.object().shape({
|
|||
name: yup
|
||||
.string()
|
||||
.max(249)
|
||||
.required()
|
||||
.required('Topic Name is required')
|
||||
.matches(
|
||||
TOPIC_NAME_VALIDATION_PATTERN,
|
||||
'Only alphanumeric, _, -, and . allowed'
|
||||
),
|
||||
partitions: yup
|
||||
.number()
|
||||
.min(1)
|
||||
.min(1, 'Number of Partitions must be greater than or equal to 1')
|
||||
.max(2147483647)
|
||||
.required()
|
||||
.typeError('Number of partitions is required and must be a number'),
|
||||
.typeError('Number of Partitions is required and must be a number'),
|
||||
replicationFactor: yup.string(),
|
||||
minInSyncReplicas: yup.string(),
|
||||
cleanupPolicy: yup.string().required(),
|
||||
|
|
|
@ -44,7 +44,6 @@ export interface TopicFormData {
|
|||
minInSyncReplicas: number;
|
||||
cleanupPolicy: string;
|
||||
retentionMs: number;
|
||||
retentionBytes: number;
|
||||
maxMessageBytes: number;
|
||||
customParams: {
|
||||
name: string;
|
||||
|
|
|
@ -291,6 +291,7 @@ const baseTheme = {
|
|||
},
|
||||
statictics: {
|
||||
createdAtColor: Colors.neutral[50],
|
||||
progressPctColor: Colors.neutral[100],
|
||||
},
|
||||
progressBar: {
|
||||
backgroundColor: Colors.neutral[3],
|
||||
|
|
4
pom.xml
4
pom.xml
|
@ -49,8 +49,8 @@
|
|||
<testcontainers.version>1.17.5</testcontainers.version>
|
||||
|
||||
<!-- Frontend dependency versions -->
|
||||
<node.version>v16.15.0</node.version>
|
||||
<pnpm.version>v7.4.0</pnpm.version>
|
||||
<node.version>v18.17.1</node.version>
|
||||
<pnpm.version>v8.6.12</pnpm.version>
|
||||
|
||||
<!-- Plugin versions -->
|
||||
<fabric8-maven-plugin.version>0.42.1</fabric8-maven-plugin.version>
|
||||
|
|
Loading…
Add table
Reference in a new issue