diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index acb97496ed..59fc5cdcc2 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -4,10 +4,10 @@ import static java.util.stream.Collectors.toMap; import com.provectus.kafka.ui.api.ClientQuotasApi; import com.provectus.kafka.ui.model.ClientQuotasDTO; -import com.provectus.kafka.ui.service.audit.AuditService; +import com.provectus.kafka.ui.model.rbac.AccessContext; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; -import com.provectus.kafka.ui.service.quota.QuotaService; -import com.provectus.kafka.ui.service.rbac.AccessControlService; +import com.provectus.kafka.ui.service.quota.ClientQuotaService; import java.math.BigDecimal; import java.util.LinkedHashMap; import java.util.Map; @@ -23,37 +23,56 @@ import reactor.core.publisher.Mono; @RequiredArgsConstructor public class ClientQuotasController extends AbstractController implements ClientQuotasApi { - private final QuotaService quotaService; - private final AccessControlService accessControlService; - private final AuditService auditService; + private final ClientQuotaService clientQuotaService; @Override public Mono>> listQuotas(String clusterName, ServerWebExchange exchange) { - return Mono.just(quotaService.list(getCluster(clusterName)).map(this::map)) - .map(ResponseEntity::ok); + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("listClientQuotas") + .clientQuotaActions(ClientQuotaAction.VIEW) + .build(); + + Mono>> operation = + Mono.just(clientQuotaService.list(getCluster(clusterName)).map(this::mapToDto)) + .map(ResponseEntity::ok); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); } @Override public Mono> upsertClientQuotas(String clusterName, - Mono clientQuotasDto, + Mono quotasDto, ServerWebExchange exchange) { - return clientQuotasDto.flatMap( - quotas -> - quotaService.upsert( + var context = AccessContext.builder() + .cluster(clusterName) + .operationName("upsertClientQuotas") + .clientQuotaActions(ClientQuotaAction.EDIT) + .build(); + + Mono> operation = quotasDto.flatMap( + newQuotas -> + clientQuotaService.upsert( getCluster(clusterName), - quotas.getUser(), - quotas.getClientId(), - quotas.getIp(), - Optional.ofNullable(quotas.getQuotas()).orElse(Map.of()) + newQuotas.getUser(), + newQuotas.getClientId(), + newQuotas.getIp(), + Optional.ofNullable(newQuotas.getQuotas()).orElse(Map.of()) .entrySet() .stream() .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) ) ).map(statusCode -> ResponseEntity.status(statusCode).build()); + + return validateAccess(context) + .then(operation) + .doOnEach(sig -> audit(context, sig)); } - private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) { + private ClientQuotasDTO mapToDto(ClientQuotaRecord quotaRecord) { return new ClientQuotasDTO() .user(quotaRecord.user()) .clientId(quotaRecord.clientId()) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java index cf126bf3df..75560fddb4 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java @@ -3,6 +3,7 @@ package com.provectus.kafka.ui.model.rbac; import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AuditAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -44,6 +45,8 @@ public class AccessContext { Collection auditAction; + Collection clientQuotaActions; + String operationName; Object operationParams; @@ -67,6 +70,7 @@ public class AccessContext { private Collection ksqlActions = Collections.emptySet(); private Collection aclActions = Collections.emptySet(); private Collection auditActions = Collections.emptySet(); + private Collection clientQuotaActions = Collections.emptySet(); private String operationName; private Object operationParams; @@ -158,6 +162,12 @@ public class AccessContext { return this; } + public AccessContextBuilder clientQuotaActions(ClientQuotaAction... actions) { + Assert.isTrue(actions.length > 0, "actions not present"); + this.clientQuotaActions = List.of(actions); + return this; + } + public AccessContextBuilder operationName(String operationName) { this.operationName = operationName; return this; @@ -182,7 +192,7 @@ public class AccessContext { connect, connectActions, connector, schema, schemaActions, - ksqlActions, aclActions, auditActions, + ksqlActions, aclActions, auditActions, clientQuotaActions, operationName, operationParams); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java index 56b0a09802..95810cbeda 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java @@ -3,12 +3,14 @@ package com.provectus.kafka.ui.model.rbac; import static com.provectus.kafka.ui.model.rbac.Resource.ACL; import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT; +import static com.provectus.kafka.ui.model.rbac.Resource.CLIENT_QUOTAS; import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG; import static com.provectus.kafka.ui.model.rbac.Resource.KSQL; import com.provectus.kafka.ui.model.rbac.permission.AclAction; import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction; import com.provectus.kafka.ui.model.rbac.permission.AuditAction; +import com.provectus.kafka.ui.model.rbac.permission.ClientQuotaAction; import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction; import com.provectus.kafka.ui.model.rbac.permission.ConnectAction; import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction; @@ -32,7 +34,7 @@ import org.springframework.util.Assert; public class Permission { private static final List RBAC_ACTION_EXEMPT_LIST = - List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT); + List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT, CLIENT_QUOTAS); Resource resource; List actions; @@ -88,6 +90,7 @@ public class Permission { case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList(); case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList(); case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList(); + case CLIENT_QUOTAS -> Arrays.stream(ClientQuotaAction.values()).map(Enum::toString).toList(); }; } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java index ca2efab3a9..f724c4326f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java @@ -13,7 +13,8 @@ public enum Resource { CONNECT, KSQL, ACL, - AUDIT; + AUDIT, + CLIENT_QUOTAS; @Nullable public static Resource fromString(String name) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java new file mode 100644 index 0000000000..1b451f7c7c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClientQuotaAction.java @@ -0,0 +1,18 @@ +package com.provectus.kafka.ui.model.rbac.permission; + +import java.util.Set; + +public enum ClientQuotaAction implements PermissibleAction { + + VIEW, + EDIT + + ; + + public static final Set ALTER_ACTIONS = Set.of(EDIT); + + public boolean isAlter() { + return ALTER_ACTIONS.contains(this); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java index 5de78a1517..1f8952d57a 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java @@ -4,5 +4,5 @@ public sealed interface PermissibleAction permits AclAction, ApplicationConfigAction, ConsumerGroupAction, SchemaAction, ConnectAction, ClusterConfigAction, - KsqlAction, TopicAction, AuditAction { + KsqlAction, TopicAction, AuditAction, ClientQuotaAction { } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java index 8dc0e49e45..820ec56043 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.service.quota; import jakarta.annotation.Nullable; +import java.util.Comparator; import java.util.Map; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -9,12 +10,17 @@ public record ClientQuotaRecord(@Nullable String user, @Nullable String ip, Map quotas) { - static ClientQuotaRecord create(ClientQuotaEntity entity, Map qoutas) { + static final Comparator COMPARATOR = + Comparator.comparing(r -> r.user) + .thenComparing(r -> r.clientId) + .thenComparing(r -> r.ip); + + static ClientQuotaRecord create(ClientQuotaEntity entity, Map quotas) { return new ClientQuotaRecord( entity.entries().get(ClientQuotaEntity.USER), entity.entries().get(ClientQuotaEntity.CLIENT_ID), entity.entries().get(ClientQuotaEntity.IP), - qoutas + quotas ); } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java similarity index 65% rename from kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java rename to kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java index 2d20ca48a1..f132cf17ea 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaService.java @@ -24,14 +24,13 @@ import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.quota.ClientQuotaFilterComponent; import org.springframework.http.HttpStatus; -import org.springframework.http.HttpStatusCode; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Service @RequiredArgsConstructor -public class QuotaService { +public class ClientQuotaService { private final AdminClientService adminClientService; @@ -39,36 +38,29 @@ public class QuotaService { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) .flatMapIterable(map -> - map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()); + map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()) + .sort(ClientQuotaRecord.COMPARATOR); } - //returns 201 is new entity was created, 200 if existing was updated, 204 if it was deleted + //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted public Mono upsert(KafkaCluster cluster, - @Nullable String user, - @Nullable String clientId, - @Nullable String ip, - Map newQuotas) { + @Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map newQuotas) { ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip); return adminClientService.get(cluster) .flatMap(ac -> findQuotas(ac, quotaEntity) .flatMap(currentQuotas -> { - Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); - List ops = Stream.concat( - quotasToClear.stream() - //setting null value to clear current state - .map(name -> new ClientQuotaAlteration.Op(name, null)), - newQuotas.entrySet().stream() - .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) - ).toList(); - HttpStatus result = HttpStatus.OK; //updated if (newQuotas.isEmpty()) { result = HttpStatus.NO_CONTENT; //deleted } else if (currentQuotas.isEmpty()) { result = HttpStatus.CREATED; } - return ac.alterClientQuota(new ClientQuotaAlteration(quotaEntity, ops)) + var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas); + return ac.alterClientQuota(alteration) .thenReturn(result); }) ); @@ -85,12 +77,26 @@ public class QuotaService { return new ClientQuotaEntity(id); } - private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { - return ac.getClientQuotas(searchFilter(quotaEntity)) - .map(foundRecords -> Optional.ofNullable(foundRecords.get(quotaEntity)).orElse(Map.of())); + private ClientQuotaAlteration createAlteration(ClientQuotaEntity quotaEntity, + Map currentQuotas, + Map newQuotas) { + Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); + List ops = Stream.concat( + quotasToClear.stream() + .map(name -> new ClientQuotaAlteration.Op(name, null)), //setting null value to clear current state + newQuotas.entrySet().stream() + .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) + ).toList(); + return new ClientQuotaAlteration(quotaEntity, ops); } - private ClientQuotaFilter searchFilter(ClientQuotaEntity quotaEntity) { + // returns empty map if no quotas found for an entity + private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { + return ac.getClientQuotas(crateSearchFilter(quotaEntity)) + .map(found -> Optional.ofNullable(found.get(quotaEntity)).orElse(Map.of())); + } + + private ClientQuotaFilter crateSearchFilter(ClientQuotaEntity quotaEntity) { List filters = new ArrayList<>(); quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name))); return ClientQuotaFilter.contains(filters); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java index 59ea02fea8..69a851ded6 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java @@ -123,7 +123,8 @@ public class AccessControlService { && isSchemaAccessible(context, user) && isKsqlAccessible(context, user) && isAclAccessible(context, user) - && isAuditAccessible(context, user); + && isAuditAccessible(context, user) + && isClientQuotaAccessible(context, user); if (!accessGranted) { throw new AccessDeniedException(ACCESS_DENIED); @@ -417,6 +418,23 @@ public class AccessControlService { return isAccessible(Resource.AUDIT, null, user, context, requiredActions); } + private boolean isClientQuotaAccessible(AccessContext context, AuthenticatedUser user) { + if (!rbacEnabled) { + return true; + } + + if (context.getClientQuotaActions().isEmpty()) { + return true; + } + + Set requiredActions = context.getClientQuotaActions() + .stream() + .map(a -> a.toString().toUpperCase()) + .collect(Collectors.toSet()); + + return isAccessible(Resource.CLIENT_QUOTAS, null, user, context, requiredActions); + } + public Set getOauthExtractors() { return oauthExtractors; } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java new file mode 100644 index 0000000000..1ca42f9de8 --- /dev/null +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/quota/ClientQuotaServiceTest.java @@ -0,0 +1,11 @@ +package com.provectus.kafka.ui.service.quota; + +import com.provectus.kafka.ui.AbstractIntegrationTest; +import org.springframework.beans.factory.annotation.Autowired; + +class ClientQuotaServiceTest extends AbstractIntegrationTest { + + @Autowired + ClientQuotaService quotaService; + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 09bbb7fc7e..57a0550616 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1936,6 +1936,9 @@ paths: - ClientQuotas summary: upsertClientQuotas operationId: upsertClientQuotas + description: | + - updates/creates client quota record if `quotas` field is non-empty + - deletes client quota record if `quotas` field is null or empty parameters: - name: clusterName in: path @@ -1951,7 +1954,7 @@ paths: 200: description: Existing quota updated 201: - description: Quota created + description: New quota created 204: description: Existing quota deleted