|
@@ -1,15 +1,29 @@
|
|
|
package com.provectus.kafka.ui.service.quota;
|
|
|
|
|
|
+import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
|
|
|
+import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
|
|
|
+import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
|
|
|
+
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.service.AdminClientService;
|
|
|
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
|
|
import jakarta.annotation.Nullable;
|
|
|
-import java.math.BigDecimal;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
import java.util.Optional;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.stream.Stream;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
|
|
|
import org.apache.kafka.common.quota.ClientQuotaEntity;
|
|
|
import org.apache.kafka.common.quota.ClientQuotaFilter;
|
|
|
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
|
|
|
+import org.springframework.http.HttpStatus;
|
|
|
import org.springframework.http.HttpStatusCode;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import reactor.core.publisher.Flux;
|
|
@@ -21,19 +35,64 @@ public class QuotaService {
|
|
|
|
|
|
private final AdminClientService adminClientService;
|
|
|
|
|
|
- public Flux<ClientQuotaRecord> all(KafkaCluster cluster) {
|
|
|
+ public Flux<ClientQuotaRecord> list(KafkaCluster cluster) {
|
|
|
return adminClientService.get(cluster)
|
|
|
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
|
|
.flatMapIterable(map ->
|
|
|
map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList());
|
|
|
}
|
|
|
|
|
|
- //returns 201 is new entity was created, 204 if exsiting was updated
|
|
|
- public Mono<HttpStatusCode> upsert(KafkaCluster cluster,
|
|
|
+ //returns 201 is new entity was created, 200 if existing was updated, 204 if it was deleted
|
|
|
+ public Mono<HttpStatus> upsert(KafkaCluster cluster,
|
|
|
@Nullable String user,
|
|
|
@Nullable String clientId,
|
|
|
@Nullable String ip,
|
|
|
- Map<String, Double> quotas) {
|
|
|
+ Map<String, Double> newQuotas) {
|
|
|
+ ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip);
|
|
|
+ return adminClientService.get(cluster)
|
|
|
+ .flatMap(ac ->
|
|
|
+ findQuotas(ac, quotaEntity)
|
|
|
+ .flatMap(currentQuotas -> {
|
|
|
+ Set<String> quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet());
|
|
|
+ List<ClientQuotaAlteration.Op> ops = Stream.concat(
|
|
|
+ quotasToClear.stream()
|
|
|
+ //setting null value to clear current state
|
|
|
+ .map(name -> new ClientQuotaAlteration.Op(name, null)),
|
|
|
+ newQuotas.entrySet().stream()
|
|
|
+ .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue()))
|
|
|
+ ).toList();
|
|
|
+
|
|
|
+ HttpStatus result = HttpStatus.OK; //updated
|
|
|
+ if (newQuotas.isEmpty()) {
|
|
|
+ result = HttpStatus.NO_CONTENT; //deleted
|
|
|
+ } else if (currentQuotas.isEmpty()) {
|
|
|
+ result = HttpStatus.CREATED;
|
|
|
+ }
|
|
|
+ return ac.alterClientQuota(new ClientQuotaAlteration(quotaEntity, ops))
|
|
|
+ .thenReturn(result);
|
|
|
+ })
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) {
|
|
|
+ if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) {
|
|
|
+ throw new ValidationException("Quota entity id is not set");
|
|
|
+ }
|
|
|
+ var id = new HashMap<String, String>();
|
|
|
+ Optional.ofNullable(user).ifPresent(u -> id.put(USER, u));
|
|
|
+ Optional.ofNullable(clientId).ifPresent(cid -> id.put(CLIENT_ID, cid));
|
|
|
+ Optional.ofNullable(ip).ifPresent(i -> id.put(IP, i));
|
|
|
+ return new ClientQuotaEntity(id);
|
|
|
+ }
|
|
|
+
|
|
|
+ private Mono<Map<String, Double>> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) {
|
|
|
+ return ac.getClientQuotas(searchFilter(quotaEntity))
|
|
|
+ .map(foundRecords -> Optional.ofNullable(foundRecords.get(quotaEntity)).orElse(Map.of()));
|
|
|
+ }
|
|
|
|
|
|
+ private ClientQuotaFilter searchFilter(ClientQuotaEntity quotaEntity) {
|
|
|
+ List<ClientQuotaFilterComponent> filters = new ArrayList<>();
|
|
|
+ quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name)));
|
|
|
+ return ClientQuotaFilter.contains(filters);
|
|
|
}
|
|
|
}
|