From 1586f75d9ff23a8f00886c82aff7f05ccc0878ac Mon Sep 17 00:00:00 2001 From: iliax Date: Fri, 11 Aug 2023 16:49:59 +0400 Subject: [PATCH] wip --- .../ui/controller/ClientQuotasController.java | 20 ++++-- .../kafka/ui/service/quota/QuotaService.java | 69 +++++++++++++++++-- .../main/resources/swagger/kafka-ui-api.yaml | 4 +- 3 files changed, 80 insertions(+), 13 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java index 709e82bf94..acb97496ed 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -9,9 +9,9 @@ import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; import com.provectus.kafka.ui.service.quota.QuotaService; import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.math.BigDecimal; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import lombok.RequiredArgsConstructor; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; @@ -30,16 +30,15 @@ public class ClientQuotasController extends AbstractController implements Client @Override public Mono>> listQuotas(String clusterName, ServerWebExchange exchange) { - return Mono.just(quotaService.all(getCluster(clusterName)).map(this::map)) + return Mono.just(quotaService.list(getCluster(clusterName)).map(this::map)) .map(ResponseEntity::ok); } @Override public Mono> upsertClientQuotas(String clusterName, - Mono clientQuotasDTO, + Mono clientQuotasDto, ServerWebExchange exchange) { - - return clientQuotasDTO.flatMap( + return clientQuotasDto.flatMap( quotas -> quotaService.upsert( getCluster(clusterName), @@ -60,8 +59,15 @@ public class ClientQuotasController extends AbstractController implements Client .clientId(quotaRecord.clientId()) .ip(quotaRecord.ip()) .quotas( - quotaRecord.quotas().entrySet().stream() - .collect(toMap(Map.Entry::getKey, e -> BigDecimal.valueOf(e.getValue()))) + quotaRecord.quotas().entrySet() + .stream() + .sorted(Map.Entry.comparingByKey()) + .collect(toMap( + Map.Entry::getKey, + e -> BigDecimal.valueOf(e.getValue()), + (v1, v2) -> null, //won't be called + LinkedHashMap::new //to keep order + )) ); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java index 3ff8ac5f69..2d20ca48a1 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java @@ -1,15 +1,29 @@ package com.provectus.kafka.ui.service.quota; +import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID; +import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; +import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; + +import com.google.common.collect.Sets; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.AdminClientService; +import com.provectus.kafka.ui.service.ReactiveAdminClient; import jakarta.annotation.Nullable; -import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.apache.kafka.common.quota.ClientQuotaFilterComponent; +import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatusCode; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; @@ -21,19 +35,64 @@ public class QuotaService { private final AdminClientService adminClientService; - public Flux all(KafkaCluster cluster) { + public Flux list(KafkaCluster cluster) { return adminClientService.get(cluster) .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) .flatMapIterable(map -> map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()); } - //returns 201 is new entity was created, 204 if exsiting was updated - public Mono upsert(KafkaCluster cluster, + //returns 201 is new entity was created, 200 if existing was updated, 204 if it was deleted + public Mono upsert(KafkaCluster cluster, @Nullable String user, @Nullable String clientId, @Nullable String ip, - Map quotas) { + Map newQuotas) { + ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip); + return adminClientService.get(cluster) + .flatMap(ac -> + findQuotas(ac, quotaEntity) + .flatMap(currentQuotas -> { + Set quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet()); + List ops = Stream.concat( + quotasToClear.stream() + //setting null value to clear current state + .map(name -> new ClientQuotaAlteration.Op(name, null)), + newQuotas.entrySet().stream() + .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue())) + ).toList(); + HttpStatus result = HttpStatus.OK; //updated + if (newQuotas.isEmpty()) { + result = HttpStatus.NO_CONTENT; //deleted + } else if (currentQuotas.isEmpty()) { + result = HttpStatus.CREATED; + } + return ac.alterClientQuota(new ClientQuotaAlteration(quotaEntity, ops)) + .thenReturn(result); + }) + ); + } + + private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) { + if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) { + throw new ValidationException("Quota entity id is not set"); + } + var id = new HashMap(); + Optional.ofNullable(user).ifPresent(u -> id.put(USER, u)); + Optional.ofNullable(clientId).ifPresent(cid -> id.put(CLIENT_ID, cid)); + Optional.ofNullable(ip).ifPresent(i -> id.put(IP, i)); + return new ClientQuotaEntity(id); + } + + private Mono> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) { + return ac.getClientQuotas(searchFilter(quotaEntity)) + .map(foundRecords -> Optional.ofNullable(foundRecords.get(quotaEntity)).orElse(Map.of())); + } + + private ClientQuotaFilter searchFilter(ClientQuotaEntity quotaEntity) { + List filters = new ArrayList<>(); + quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name))); + return ClientQuotaFilter.contains(filters); } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 8f68bb049f..09bbb7fc7e 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1948,10 +1948,12 @@ paths: schema: $ref: '#/components/schemas/ClientQuotas' responses: + 200: + description: Existing quota updated 201: description: Quota created 204: - description: Existing quota updated + description: Existing quota deleted /api/clusters/{clusterName}/acl/streamApp: