Merge a9a8756bc7
into cca2c96997
This commit is contained in:
commit
e749f1a5c9
16 changed files with 459 additions and 9 deletions
|
@ -0,0 +1,105 @@
|
|||
package com.provectus.kafka.ui.controller;
|
||||
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
import com.provectus.kafka.ui.api.ClientQuotasApi;
|
||||
import com.provectus.kafka.ui.model.ClientQuotasDTO;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
|
||||
import com.provectus.kafka.ui.service.quota.ClientQuotaRecord;
|
||||
import com.provectus.kafka.ui.service.quota.ClientQuotaService;
|
||||
import java.math.BigDecimal;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@RestController
|
||||
@RequiredArgsConstructor
|
||||
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {
|
||||
|
||||
private static final Comparator<ClientQuotaRecord> QUOTA_RECORDS_COMPARATOR =
|
||||
Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::user))
|
||||
.thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::clientId)))
|
||||
.thenComparing(Comparator.nullsLast(Comparator.comparing(ClientQuotaRecord::ip)));
|
||||
|
||||
private final ClientQuotaService clientQuotaService;
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Flux<ClientQuotasDTO>>> listQuotas(String clusterName,
|
||||
ServerWebExchange exchange) {
|
||||
var context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.operationName("listClientQuotas")
|
||||
.clientQuotaActions(ClientQuotaAction.VIEW)
|
||||
.build();
|
||||
|
||||
Mono<ResponseEntity<Flux<ClientQuotasDTO>>> operation =
|
||||
Mono.just(
|
||||
clientQuotaService.getAll(getCluster(clusterName))
|
||||
.sort(QUOTA_RECORDS_COMPARATOR)
|
||||
.map(this::mapToDto)
|
||||
).map(ResponseEntity::ok);
|
||||
|
||||
return validateAccess(context)
|
||||
.then(operation)
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName,
|
||||
Mono<ClientQuotasDTO> quotasDto,
|
||||
ServerWebExchange exchange) {
|
||||
return quotasDto.flatMap(
|
||||
newQuotas -> {
|
||||
var context = AccessContext.builder()
|
||||
.cluster(clusterName)
|
||||
.operationName("upsertClientQuotas")
|
||||
.operationParams(Map.of("newQuotas", newQuotas))
|
||||
.clientQuotaActions(ClientQuotaAction.EDIT)
|
||||
.build();
|
||||
|
||||
Mono<ResponseEntity<Void>> operation = clientQuotaService.upsert(
|
||||
getCluster(clusterName),
|
||||
newQuotas.getUser(),
|
||||
newQuotas.getClientId(),
|
||||
newQuotas.getIp(),
|
||||
Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of())
|
||||
.entrySet()
|
||||
.stream()
|
||||
.collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue()))
|
||||
)
|
||||
.map(statusCode -> ResponseEntity.status(statusCode).build());
|
||||
|
||||
return validateAccess(context)
|
||||
.then(operation)
|
||||
.doOnEach(sig -> audit(context, sig));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) {
|
||||
return new ClientQuotasDTO()
|
||||
.user(quotaRecord.user())
|
||||
.clientId(quotaRecord.clientId())
|
||||
.ip(quotaRecord.ip())
|
||||
.quotas(
|
||||
quotaRecord.quotas().entrySet()
|
||||
.stream()
|
||||
.sorted(Map.Entry.comparingByKey())
|
||||
.collect(toMap(
|
||||
Map.Entry::getKey,
|
||||
e -> BigDecimal.valueOf(e.getValue()),
|
||||
(v1, v2) -> null, //won't be called
|
||||
LinkedHashMap::new //to keep order
|
||||
))
|
||||
);
|
||||
}
|
||||
|
||||
}
|
|
@ -6,5 +6,6 @@ public enum ClusterFeature {
|
|||
SCHEMA_REGISTRY,
|
||||
TOPIC_DELETION,
|
||||
KAFKA_ACL_VIEW,
|
||||
KAFKA_ACL_EDIT
|
||||
KAFKA_ACL_EDIT,
|
||||
CLIENT_QUOTA_MANAGEMENT
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package com.provectus.kafka.ui.model.rbac;
|
|||
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
|
||||
|
@ -44,6 +45,8 @@ public class AccessContext {
|
|||
|
||||
Collection<AuditAction> auditAction;
|
||||
|
||||
Collection<ClientQuotaAction> clientQuotaActions;
|
||||
|
||||
String operationName;
|
||||
Object operationParams;
|
||||
|
||||
|
@ -67,6 +70,7 @@ public class AccessContext {
|
|||
private Collection<KsqlAction> ksqlActions = Collections.emptySet();
|
||||
private Collection<AclAction> aclActions = Collections.emptySet();
|
||||
private Collection<AuditAction> auditActions = Collections.emptySet();
|
||||
private Collection<ClientQuotaAction> clientQuotaActions = Collections.emptySet();
|
||||
|
||||
private String operationName;
|
||||
private Object operationParams;
|
||||
|
@ -158,6 +162,12 @@ public class AccessContext {
|
|||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) {
|
||||
Assert.isTrue(actions.length > 0, "actions not present");
|
||||
this.clientQuotaActions = List.of(actions);
|
||||
return this;
|
||||
}
|
||||
|
||||
public AccessContextBuilder operationName(String operationName) {
|
||||
this.operationName = operationName;
|
||||
return this;
|
||||
|
@ -182,7 +192,7 @@ public class AccessContext {
|
|||
connect, connectActions,
|
||||
connector,
|
||||
schema, schemaActions,
|
||||
ksqlActions, aclActions, auditActions,
|
||||
ksqlActions, aclActions, auditActions, clientQuotaActions,
|
||||
operationName, operationParams);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,12 +3,14 @@ package com.provectus.kafka.ui.model.rbac;
|
|||
import static com.provectus.kafka.ui.model.rbac.Resource.ACL;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.CLIENT_QUOTAS;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
|
||||
import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
|
||||
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
|
||||
|
@ -32,7 +34,7 @@ import org.springframework.util.Assert;
|
|||
public class Permission {
|
||||
|
||||
private static final List<Resource> RBAC_ACTION_EXEMPT_LIST =
|
||||
List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT);
|
||||
List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT, CLIENT_QUOTAS);
|
||||
|
||||
Resource resource;
|
||||
List<String> actions;
|
||||
|
@ -88,6 +90,7 @@ public class Permission {
|
|||
case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
|
||||
case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
|
||||
case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList();
|
||||
case CLIENT_QUOTAS -> Arrays.stream(ClientQuotaAction.values()).map(Enum::toString).toList();
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,8 @@ public enum Resource {
|
|||
CONNECT,
|
||||
KSQL,
|
||||
ACL,
|
||||
AUDIT;
|
||||
AUDIT,
|
||||
CLIENT_QUOTAS;
|
||||
|
||||
@Nullable
|
||||
public static Resource fromString(String name) {
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package com.provectus.kafka.ui.model.rbac.permission;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
public enum ClientQuotaAction implements PermissibleAction {
|
||||
|
||||
VIEW,
|
||||
EDIT
|
||||
|
||||
;
|
||||
|
||||
public static final Set<ClientQuotaAction> ALTER_ACTIONS = Set.of(EDIT);
|
||||
|
||||
@Override
|
||||
public boolean isAlter() {
|
||||
return ALTER_ACTIONS.contains(this);
|
||||
}
|
||||
|
||||
}
|
|
@ -4,7 +4,7 @@ public sealed interface PermissibleAction permits
|
|||
AclAction, ApplicationConfigAction,
|
||||
ConsumerGroupAction, SchemaAction,
|
||||
ConnectAction, ClusterConfigAction,
|
||||
KsqlAction, TopicAction, AuditAction {
|
||||
KsqlAction, TopicAction, AuditAction, ClientQuotaAction {
|
||||
|
||||
String name();
|
||||
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package com.provectus.kafka.ui.service;
|
||||
|
||||
import static com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT;
|
||||
|
||||
import com.provectus.kafka.ui.model.ClusterFeature;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
|
||||
|
@ -41,6 +43,7 @@ public class FeatureService {
|
|||
features.add(topicDeletionEnabled(adminClient));
|
||||
features.add(aclView(adminClient));
|
||||
features.add(aclEdit(adminClient, clusterDescription));
|
||||
features.add(quotaManagement(adminClient));
|
||||
|
||||
return Flux.fromIterable(features).flatMap(m -> m).collectList();
|
||||
}
|
||||
|
@ -70,4 +73,10 @@ public class FeatureService {
|
|||
return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED);
|
||||
}
|
||||
|
||||
private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
|
||||
return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT)
|
||||
? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT)
|
||||
: Mono.empty();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -77,6 +77,9 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
|
|||
import org.apache.kafka.common.errors.TopicAuthorizationException;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.quota.ClientQuotaAlteration;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
import org.apache.kafka.common.quota.ClientQuotaFilter;
|
||||
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
|
||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
@ -94,7 +97,8 @@ public class ReactiveAdminClient implements Closeable {
|
|||
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
||||
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
|
||||
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
|
||||
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
|
||||
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled),
|
||||
CLIENT_QUOTA_MANAGEMENT(2.6f);
|
||||
|
||||
private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
|
||||
|
||||
|
@ -658,6 +662,14 @@ public class ReactiveAdminClient implements Closeable {
|
|||
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
|
||||
}
|
||||
|
||||
public Mono<Map<ClientQuotaEntity, Map<String, Double>>> getClientQuotas(ClientQuotaFilter filter) {
|
||||
return toMono(client.describeClientQuotas(filter).entities());
|
||||
}
|
||||
|
||||
public Mono<Void> alterClientQuota(ClientQuotaAlteration alteration) {
|
||||
return toMono(client.alterClientQuotas(List.of(alteration)).all());
|
||||
}
|
||||
|
||||
private Mono<Void> incrementalAlterConfig(String topicName,
|
||||
List<ConfigEntry> currentConfigs,
|
||||
Map<String, String> newConfigs) {
|
||||
|
|
|
@ -65,6 +65,8 @@ record AuditRecord(String timestamp,
|
|||
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
|
||||
ctx.getAuditAction()
|
||||
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
|
||||
ctx.getClientQuotaActions()
|
||||
.forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null)));
|
||||
return resources;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
package com.provectus.kafka.ui.service.quota;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
|
||||
public record ClientQuotaRecord(@Nullable String user,
|
||||
@Nullable String clientId,
|
||||
@Nullable String ip,
|
||||
Map<String, Double> quotas) {
|
||||
|
||||
static ClientQuotaRecord create(ClientQuotaEntity entity, Map<String, Double> quotas) {
|
||||
return new ClientQuotaRecord(
|
||||
entity.entries().get(ClientQuotaEntity.USER),
|
||||
entity.entries().get(ClientQuotaEntity.CLIENT_ID),
|
||||
entity.entries().get(ClientQuotaEntity.IP),
|
||||
quotas
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package com.provectus.kafka.ui.service.quota;
|
||||
|
||||
import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
|
||||
import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
|
||||
import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.provectus.kafka.ui.exception.ValidationException;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.service.AdminClientService;
|
||||
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
||||
import jakarta.annotation.Nullable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.kafka.common.quota.ClientQuotaAlteration;
|
||||
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
||||
import org.apache.kafka.common.quota.ClientQuotaFilter;
|
||||
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class ClientQuotaService {
|
||||
|
||||
private final AdminClientService adminClientService;
|
||||
|
||||
public Flux<ClientQuotaRecord> getAll(KafkaCluster cluster) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
||||
.flatMapIterable(Map::entrySet)
|
||||
.map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue()));
|
||||
}
|
||||
|
||||
//returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted
|
||||
public Mono<HttpStatus> upsert(KafkaCluster cluster,
|
||||
@Nullable String user,
|
||||
@Nullable String clientId,
|
||||
@Nullable String ip,
|
||||
Map<String, Double> newQuotas) {
|
||||
ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip);
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac ->
|
||||
findQuotas(ac, quotaEntity)
|
||||
.flatMap(currentQuotas -> {
|
||||
HttpStatus result = HttpStatus.OK; //updated
|
||||
if (newQuotas.isEmpty()) {
|
||||
result = HttpStatus.NO_CONTENT; //deleted
|
||||
} else if (currentQuotas.isEmpty()) {
|
||||
result = HttpStatus.CREATED;
|
||||
}
|
||||
var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas);
|
||||
return ac.alterClientQuota(alteration)
|
||||
.thenReturn(result);
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) {
|
||||
if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) {
|
||||
throw new ValidationException("Quota entity id is not set");
|
||||
}
|
||||
var id = new HashMap<String, String>();
|
||||
Optional.ofNullable(user).ifPresent(u -> id.put(USER, u));
|
||||
Optional.ofNullable(clientId).ifPresent(cid -> id.put(CLIENT_ID, cid));
|
||||
Optional.ofNullable(ip).ifPresent(i -> id.put(IP, i));
|
||||
return new ClientQuotaEntity(id);
|
||||
}
|
||||
|
||||
private ClientQuotaAlteration createAlteration(ClientQuotaEntity quotaEntity,
|
||||
Map<String, Double> currentQuotas,
|
||||
Map<String, Double> newQuotas) {
|
||||
Set<String> quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet());
|
||||
List<ClientQuotaAlteration.Op> ops = Stream.concat(
|
||||
quotasToClear.stream()
|
||||
.map(name -> new ClientQuotaAlteration.Op(name, null)), //setting null value to clear current state
|
||||
newQuotas.entrySet().stream()
|
||||
.map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue()))
|
||||
).toList();
|
||||
return new ClientQuotaAlteration(quotaEntity, ops);
|
||||
}
|
||||
|
||||
// returns empty map if no quotas found for an entity
|
||||
private Mono<Map<String, Double>> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) {
|
||||
return ac.getClientQuotas(crateSearchFilter(quotaEntity))
|
||||
.map(found -> Optional.ofNullable(found.get(quotaEntity)).orElse(Map.of()));
|
||||
}
|
||||
|
||||
private ClientQuotaFilter crateSearchFilter(ClientQuotaEntity quotaEntity) {
|
||||
List<ClientQuotaFilterComponent> filters = new ArrayList<>();
|
||||
quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name)));
|
||||
return ClientQuotaFilter.contains(filters);
|
||||
}
|
||||
}
|
|
@ -123,7 +123,8 @@ public class AccessControlService {
|
|||
&& isSchemaAccessible(context, user)
|
||||
&& isKsqlAccessible(context, user)
|
||||
&& isAclAccessible(context, user)
|
||||
&& isAuditAccessible(context, user);
|
||||
&& isAuditAccessible(context, user)
|
||||
&& isClientQuotaAccessible(context, user);
|
||||
|
||||
if (!accessGranted) {
|
||||
throw new AccessDeniedException(ACCESS_DENIED);
|
||||
|
@ -417,6 +418,23 @@ public class AccessControlService {
|
|||
return isAccessible(Resource.AUDIT, null, user, context, requiredActions);
|
||||
}
|
||||
|
||||
private boolean isClientQuotaAccessible(AccessContext context, AuthenticatedUser user) {
|
||||
if (!rbacEnabled) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (context.getClientQuotaActions().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Set<String> requiredActions = context.getClientQuotaActions()
|
||||
.stream()
|
||||
.map(a -> a.toString().toUpperCase())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
return isAccessible(Resource.CLIENT_QUOTAS, null, user, context, requiredActions);
|
||||
}
|
||||
|
||||
public Set<ProviderAuthorityExtractor> getOauthExtractors() {
|
||||
return oauthExtractors;
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
|
|||
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
||||
import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
|
||||
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
|
||||
|
@ -55,9 +56,11 @@ class AuditWriterTest {
|
|||
SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a));
|
||||
Stream<UnaryOperator<AccessContextBuilder>> connEditActions =
|
||||
ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a));
|
||||
Stream<UnaryOperator<AccessContextBuilder>> quotaEditActions =
|
||||
ClientQuotaAction.ALTER_ACTIONS.stream().map(a -> c -> c.clientQuotaActions(a));
|
||||
return Stream.of(
|
||||
topicEditActions, clusterConfigEditActions, aclEditActions,
|
||||
cgEditActions, connEditActions, schemaEditActions
|
||||
cgEditActions, connEditActions, schemaEditActions, quotaEditActions
|
||||
)
|
||||
.flatMap(c -> c)
|
||||
.map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
|
||||
|
@ -78,7 +81,8 @@ class AuditWriterTest {
|
|||
c -> c.aclActions(AclAction.VIEW),
|
||||
c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW),
|
||||
c -> c.schema("sc").schemaActions(SchemaAction.VIEW),
|
||||
c -> c.connect("conn").connectActions(ConnectAction.VIEW)
|
||||
c -> c.connect("conn").connectActions(ConnectAction.VIEW),
|
||||
c -> c.clientQuotaActions(ClientQuotaAction.VIEW)
|
||||
).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
package com.provectus.kafka.ui.service.quota;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||
import com.provectus.kafka.ui.service.ClustersStorage;
|
||||
import java.util.Map;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
class ClientQuotaServiceTest extends AbstractIntegrationTest {
|
||||
|
||||
@Autowired
|
||||
ClientQuotaService quotaService;
|
||||
|
||||
private KafkaCluster cluster;
|
||||
|
||||
@BeforeEach
|
||||
void init() {
|
||||
cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).get();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(
|
||||
value = {
|
||||
"testUser, null, null ",
|
||||
"null, testUserId, null",
|
||||
"testUser2, testUserId2, null",
|
||||
},
|
||||
nullValues = "null"
|
||||
)
|
||||
void createUpdateDelete(String user, String clientId, String ip) {
|
||||
var initialQuotas = Map.of(
|
||||
"producer_byte_rate", 123.0,
|
||||
"consumer_byte_rate", 234.0,
|
||||
"request_percentage", 10.0
|
||||
);
|
||||
|
||||
//creating new
|
||||
StepVerifier.create(
|
||||
quotaService.upsert(cluster, user, clientId, ip, initialQuotas)
|
||||
)
|
||||
.assertNext(status -> assertThat(status.value()).isEqualTo(201))
|
||||
.verifyComplete();
|
||||
|
||||
assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, initialQuotas)))
|
||||
.isTrue();
|
||||
|
||||
//updating
|
||||
StepVerifier.create(
|
||||
quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 22222.0))
|
||||
)
|
||||
.assertNext(status -> assertThat(status.value()).isEqualTo(200))
|
||||
.verifyComplete();
|
||||
|
||||
assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0))))
|
||||
.isTrue();
|
||||
|
||||
//deleting created record
|
||||
StepVerifier.create(
|
||||
quotaService.upsert(cluster, user, clientId, ip, Map.of())
|
||||
)
|
||||
.assertNext(status -> assertThat(status.value()).isEqualTo(204))
|
||||
.verifyComplete();
|
||||
|
||||
assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0))))
|
||||
.isFalse();
|
||||
}
|
||||
|
||||
private boolean quotaRecordExists(ClientQuotaRecord rec) {
|
||||
return quotaService.getAll(cluster).collectList().block().contains(rec);
|
||||
}
|
||||
|
||||
}
|
|
@ -1910,6 +1910,55 @@ paths:
|
|||
200:
|
||||
description: OK
|
||||
|
||||
/api/clusters/{clusterName}/clientquotas:
|
||||
get:
|
||||
tags:
|
||||
- ClientQuotas
|
||||
summary: listQuotas
|
||||
operationId: listQuotas
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/components/schemas/ClientQuotas'
|
||||
post:
|
||||
tags:
|
||||
- ClientQuotas
|
||||
summary: upsertClientQuotas
|
||||
operationId: upsertClientQuotas
|
||||
description: |
|
||||
- updates/creates client quota record if `quotas` field is non-empty
|
||||
- deletes client quota record if `quotas` field is null or empty
|
||||
parameters:
|
||||
- name: clusterName
|
||||
in: path
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ClientQuotas'
|
||||
responses:
|
||||
200:
|
||||
description: Existing quota updated
|
||||
201:
|
||||
description: New quota created
|
||||
204:
|
||||
description: Existing quota deleted
|
||||
|
||||
|
||||
/api/clusters/{clusterName}/acl/streamApp:
|
||||
post:
|
||||
tags:
|
||||
|
@ -2175,6 +2224,7 @@ components:
|
|||
- TOPIC_DELETION
|
||||
- KAFKA_ACL_VIEW # get ACLs listing
|
||||
- KAFKA_ACL_EDIT # create & delete ACLs
|
||||
- CLIENT_QUOTA_MANAGEMENT
|
||||
required:
|
||||
- id
|
||||
- name
|
||||
|
@ -3600,6 +3650,7 @@ components:
|
|||
- KSQL
|
||||
- ACL
|
||||
- AUDIT
|
||||
- CLIENT_QUOTAS
|
||||
|
||||
KafkaAcl:
|
||||
type: object
|
||||
|
@ -3701,6 +3752,20 @@ components:
|
|||
nullable: false
|
||||
type: string
|
||||
|
||||
ClientQuotas:
|
||||
type: object
|
||||
properties:
|
||||
user:
|
||||
type: string
|
||||
clientId:
|
||||
type: string
|
||||
ip:
|
||||
type: string
|
||||
quotas:
|
||||
type: object
|
||||
additionalProperties:
|
||||
type: number
|
||||
|
||||
KafkaAclResourceType:
|
||||
type: string
|
||||
enum:
|
||||
|
|
Loading…
Add table
Reference in a new issue