This commit is contained in:
iliax 2023-08-11 19:24:24 +04:00
parent f69c484dd7
commit 5c6cd565a6
6 changed files with 56 additions and 33 deletions

View file

@ -47,29 +47,32 @@ public class ClientQuotasController extends AbstractController implements Client
public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName, public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName,
Mono<ClientQuotasDTO> quotasDto, Mono<ClientQuotasDTO> quotasDto,
ServerWebExchange exchange) { ServerWebExchange exchange) {
var context = AccessContext.builder() return quotasDto.flatMap(
.cluster(clusterName) newQuotas -> {
.operationName("upsertClientQuotas") var context = AccessContext.builder()
.clientQuotaActions(ClientQuotaAction.EDIT) .cluster(clusterName)
.build(); .operationName("upsertClientQuotas")
.operationParams(Map.of("newQuotas", newQuotas))
.clientQuotaActions(ClientQuotaAction.EDIT)
.build();
Mono<ResponseEntity<Void>> operation = quotasDto.flatMap( Mono<ResponseEntity<Void>> operation = clientQuotaService.upsert(
newQuotas -> getCluster(clusterName),
clientQuotaService.upsert( newQuotas.getUser(),
getCluster(clusterName), newQuotas.getClientId(),
newQuotas.getUser(), newQuotas.getIp(),
newQuotas.getClientId(), Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of())
newQuotas.getIp(), .entrySet()
Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) .stream()
.entrySet() .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue()))
.stream() )
.collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) .map(statusCode -> ResponseEntity.status(statusCode).build());
)
).map(statusCode -> ResponseEntity.status(statusCode).build());
return validateAccess(context) return validateAccess(context)
.then(operation) .then(operation)
.doOnEach(sig -> audit(context, sig)); .doOnEach(sig -> audit(context, sig));
}
);
} }
private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) {

View file

@ -11,6 +11,7 @@ public enum ClientQuotaAction implements PermissibleAction {
public static final Set<ClientQuotaAction> ALTER_ACTIONS = Set.of(EDIT); public static final Set<ClientQuotaAction> ALTER_ACTIONS = Set.of(EDIT);
@Override
public boolean isAlter() { public boolean isAlter() {
return ALTER_ACTIONS.contains(this); return ALTER_ACTIONS.contains(this);
} }

View file

@ -65,6 +65,8 @@ record AuditRecord(String timestamp,
.forEach(a -> resources.add(create(a, Resource.ACL, null))); .forEach(a -> resources.add(create(a, Resource.ACL, null)));
ctx.getAuditAction() ctx.getAuditAction()
.forEach(a -> resources.add(create(a, Resource.AUDIT, null))); .forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
ctx.getClientQuotaActions()
.forEach(a -> resources.add(create(a, Resource.CLIENT_QUOTAS, null)));
return resources; return resources;
} }

View file

@ -4,7 +4,6 @@ import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
@ -70,8 +69,7 @@ public class ClientQuotaService {
); );
} }
@VisibleForTesting private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) {
static ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) {
if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) {
throw new ValidationException("Quota entity id is not set"); throw new ValidationException("Quota entity id is not set");
} }

View file

@ -8,6 +8,7 @@ import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder; import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder;
import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
@ -55,9 +56,11 @@ class AuditWriterTest {
SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a)); SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a));
Stream<UnaryOperator<AccessContextBuilder>> connEditActions = Stream<UnaryOperator<AccessContextBuilder>> connEditActions =
ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a)); ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a));
Stream<UnaryOperator<AccessContextBuilder>> quotaEditActions =
ClientQuotaAction.ALTER_ACTIONS.stream().map(a -> c -> c.clientQuotaActions(a));
return Stream.of( return Stream.of(
topicEditActions, clusterConfigEditActions, aclEditActions, topicEditActions, clusterConfigEditActions, aclEditActions,
cgEditActions, connEditActions, schemaEditActions cgEditActions, connEditActions, schemaEditActions, quotaEditActions
) )
.flatMap(c -> c) .flatMap(c -> c)
.map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); .map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
@ -78,7 +81,8 @@ class AuditWriterTest {
c -> c.aclActions(AclAction.VIEW), c -> c.aclActions(AclAction.VIEW),
c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW), c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW),
c -> c.schema("sc").schemaActions(SchemaAction.VIEW), c -> c.schema("sc").schemaActions(SchemaAction.VIEW),
c -> c.connect("conn").connectActions(ConnectAction.VIEW) c -> c.connect("conn").connectActions(ConnectAction.VIEW),
c -> c.clientQuotaActions(ClientQuotaAction.VIEW)
).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build()); ).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
} }
} }

View file

@ -30,17 +30,13 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest {
"testUser, null, null ", "testUser, null, null ",
"null, testUserId, null", "null, testUserId, null",
"testUser2, testUserId2, null", "testUser2, testUserId2, null",
"null, null, 127.0.0.1"
}, },
nullValues = "null" nullValues = "null"
) )
void createsQuotaRecord(String user, String clientId, String ip) { void createUpdateDelete(String user, String clientId, String ip) {
//creating new
StepVerifier.create( StepVerifier.create(
quotaService.upsert( quotaService.upsert(cluster, user, clientId, ip,
cluster,
user,
clientId,
ip,
Map.of( Map.of(
"producer_byte_rate", 123.0, "producer_byte_rate", 123.0,
"consumer_byte_rate", 234.0, "consumer_byte_rate", 234.0,
@ -50,6 +46,25 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest {
) )
.assertNext(status -> assertThat(status.value()).isEqualTo(201)) .assertNext(status -> assertThat(status.value()).isEqualTo(201))
.verifyComplete(); .verifyComplete();
//updating
StepVerifier.create(
quotaService.upsert(cluster, user, clientId, ip,
Map.of(
"producer_byte_rate", 111111.0,
"consumer_byte_rate", 22222.0
)
)
)
.assertNext(status -> assertThat(status.value()).isEqualTo(200))
.verifyComplete();
//deleting just created record
StepVerifier.create(
quotaService.upsert(cluster, user, clientId, ip, Map.of())
)
.assertNext(status -> assertThat(status.value()).isEqualTo(204))
.verifyComplete();
} }
} }