diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index 59fc5cdcc2..64f4204562 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -47,29 +47,32 @@ public class ClientQuotasController extends AbstractController implements Client public Mono> upsertClientQuotas(String clusterName, Mono quotasDto, ServerWebExchange exchange) { - var context = AccessContext.builder() - .cluster(clusterName) - .operationName("upsertClientQuotas") - .clientQuotaActions(ClientQuotaAction.EDIT) - .build(); + return quotasDto.flatMap( + newQuotas -> { + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("upsertClientQuotas") + .operationParams(Map.of("newQuotas", newQuotas)) + .clientQuotaActions(ClientQuotaAction.EDIT) + .build(); - Mono> operation = quotasDto.flatMap( - newQuotas -> - clientQuotaService.upsert( - getCluster(clusterName), - newQuotas.getUser(), - newQuotas.getClientId(), - newQuotas.getIp(), - Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) - .entrySet() - .stream() - .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) - ) - ).map(statusCode -> ResponseEntity.status(statusCode).build()); + Mono> operation = clientQuotaService.upsert( + getCluster(clusterName), + newQuotas.getUser(), + newQuotas.getClientId(), + newQuotas.getIp(), + Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) + ) + .map(statusCode -> ResponseEntity.status(statusCode).build()); - return validateAccess(context) - .then(operation) - .doOnEach(sig -> audit(context, sig)); + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); + } + ); } private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java index 1b451f7c7c..b2ff994d0d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java @@ -11,6 +11,7 @@ public enum ClientQuotaAction implements PermissibleAction { public static final Set ALTER_ACTIONS = Set.of(EDIT); + @Override public boolean isAlter() { return ALTER_ACTIONS.contains(this); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java index 3f7fb44aac..870da8668d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java @@ -65,6 +65,8 @@ record AuditRecord(String timestamp, .forEach(a -> resources.add(create(a, Resource.ACL, null))); ctx.getAuditAction() .forEach(a -> resources.add(create(a, Resource.AUDIT, null))); + ctx.getClientQuotaActions() + .forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null))); return resources; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index 3c78a9bf81..97f17c12c8 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -4,7 +4,6 @@ import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID; import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; @@ -70,8 +69,7 @@ public class ClientQuotaService { ); } - @VisibleForTesting - static ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { throw new ValidationException("Quota entity id is not set"); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java index 5bcee45ac8..4213e888d3 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.config.auth.AuthenticatedUser; import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder; import com.provectus.kafka.ui.model.rbac.permission.AclAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -55,9 +56,11 @@ class AuditWriterTest { SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a)); Stream> connEditActions = ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a)); + Stream> quotaEditActions = + ClientQuotaAction.ALTER_ACTIONS.stream().map(a -> c -> c.clientQuotaActions(a)); return Stream.of( topicEditActions, clusterConfigEditActions, aclEditActions, - cgEditActions, connEditActions, schemaEditActions + cgEditActions, connEditActions, schemaEditActions, quotaEditActions ) .flatMap(c -> c) .map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); @@ -78,7 +81,8 @@ class AuditWriterTest { c -> c.aclActions(AclAction.VIEW), c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW), c -> c.schema("sc").schemaActions(SchemaAction.VIEW), - c -> c.connect("conn").connectActions(ConnectAction.VIEW) + c -> c.connect("conn").connectActions(ConnectAction.VIEW), + c -> c.clientQuotaActions(ClientQuotaAction.VIEW) ).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); } } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java index d79a62c78a..cffc012ca3 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -30,17 +30,13 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest { "testUser, null, null ", "null, testUserId, null", "testUser2, testUserId2, null", - "null, null, 127.0.0.1" }, nullValues = "null" ) - void createsQuotaRecord(String user, String clientId, String ip) { + void createUpdateDelete(String user, String clientId, String ip) { + //creating new StepVerifier.create( - quotaService.upsert( - cluster, - user, - clientId, - ip, + quotaService.upsert(cluster, user, clientId, ip, Map.of( "producer_byte_rate", 123.0, "consumer_byte_rate", 234.0, @@ -50,6 +46,25 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest { ) .assertNext(status -> assertThat(status.value()).isEqualTo(201)) .verifyComplete(); + + //updating + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, + Map.of( + "producer_byte_rate", 111111.0, + "consumer_byte_rate", 22222.0 + ) + ) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(200)) + .verifyComplete(); + + //deleting just created record + StepVerifier.create( + quotaService.upsert(cluster, user, clientId, ip, Map.of()) + ) + .assertNext(status -> assertThat(status.value()).isEqualTo(204)) + .verifyComplete(); } }