|
@@ -24,14 +24,13 @@ import org.apache.kafka.common.quota.ClientQuotaEntity;
|
|
import org.apache.kafka.common.quota.ClientQuotaFilter;
|
|
import org.apache.kafka.common.quota.ClientQuotaFilter;
|
|
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
|
|
import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
|
|
import org.springframework.http.HttpStatus;
|
|
import org.springframework.http.HttpStatus;
|
|
-import org.springframework.http.HttpStatusCode;
|
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
@Service
|
|
@Service
|
|
@RequiredArgsConstructor
|
|
@RequiredArgsConstructor
|
|
-public class QuotaService {
|
|
|
|
|
|
+public class ClientQuotaService {
|
|
|
|
|
|
private final AdminClientService adminClientService;
|
|
private final AdminClientService adminClientService;
|
|
|
|
|
|
@@ -39,36 +38,29 @@ public class QuotaService {
|
|
return adminClientService.get(cluster)
|
|
return adminClientService.get(cluster)
|
|
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
|
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
|
.flatMapIterable(map ->
|
|
.flatMapIterable(map ->
|
|
- map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList());
|
|
|
|
|
|
+ map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList())
|
|
|
|
+ .sort(ClientQuotaRecord.COMPARATOR);
|
|
}
|
|
}
|
|
|
|
|
|
- //returns 201 is new entity was created, 200 if existing was updated, 204 if it was deleted
|
|
|
|
|
|
+ //returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted
|
|
public Mono<HttpStatus> upsert(KafkaCluster cluster,
|
|
public Mono<HttpStatus> upsert(KafkaCluster cluster,
|
|
- @Nullable String user,
|
|
|
|
- @Nullable String clientId,
|
|
|
|
- @Nullable String ip,
|
|
|
|
- Map<String, Double> newQuotas) {
|
|
|
|
|
|
+ @Nullable String user,
|
|
|
|
+ @Nullable String clientId,
|
|
|
|
+ @Nullable String ip,
|
|
|
|
+ Map<String, Double> newQuotas) {
|
|
ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip);
|
|
ClientQuotaEntity quotaEntity = quotaEntity(user, clientId, ip);
|
|
return adminClientService.get(cluster)
|
|
return adminClientService.get(cluster)
|
|
.flatMap(ac ->
|
|
.flatMap(ac ->
|
|
findQuotas(ac, quotaEntity)
|
|
findQuotas(ac, quotaEntity)
|
|
.flatMap(currentQuotas -> {
|
|
.flatMap(currentQuotas -> {
|
|
- Set<String> quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet());
|
|
|
|
- List<ClientQuotaAlteration.Op> ops = Stream.concat(
|
|
|
|
- quotasToClear.stream()
|
|
|
|
- //setting null value to clear current state
|
|
|
|
- .map(name -> new ClientQuotaAlteration.Op(name, null)),
|
|
|
|
- newQuotas.entrySet().stream()
|
|
|
|
- .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue()))
|
|
|
|
- ).toList();
|
|
|
|
-
|
|
|
|
HttpStatus result = HttpStatus.OK; //updated
|
|
HttpStatus result = HttpStatus.OK; //updated
|
|
if (newQuotas.isEmpty()) {
|
|
if (newQuotas.isEmpty()) {
|
|
result = HttpStatus.NO_CONTENT; //deleted
|
|
result = HttpStatus.NO_CONTENT; //deleted
|
|
} else if (currentQuotas.isEmpty()) {
|
|
} else if (currentQuotas.isEmpty()) {
|
|
result = HttpStatus.CREATED;
|
|
result = HttpStatus.CREATED;
|
|
}
|
|
}
|
|
- return ac.alterClientQuota(new ClientQuotaAlteration(quotaEntity, ops))
|
|
|
|
|
|
+ var alteration = createAlteration(quotaEntity, currentQuotas, newQuotas);
|
|
|
|
+ return ac.alterClientQuota(alteration)
|
|
.thenReturn(result);
|
|
.thenReturn(result);
|
|
})
|
|
})
|
|
);
|
|
);
|
|
@@ -85,12 +77,26 @@ public class QuotaService {
|
|
return new ClientQuotaEntity(id);
|
|
return new ClientQuotaEntity(id);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private ClientQuotaAlteration createAlteration(ClientQuotaEntity quotaEntity,
|
|
|
|
+ Map<String, Double> currentQuotas,
|
|
|
|
+ Map<String, Double> newQuotas) {
|
|
|
|
+ Set<String> quotasToClear = Sets.difference(currentQuotas.keySet(), newQuotas.keySet());
|
|
|
|
+ List<ClientQuotaAlteration.Op> ops = Stream.concat(
|
|
|
|
+ quotasToClear.stream()
|
|
|
|
+ .map(name -> new ClientQuotaAlteration.Op(name, null)), //setting null value to clear current state
|
|
|
|
+ newQuotas.entrySet().stream()
|
|
|
|
+ .map(e -> new ClientQuotaAlteration.Op(e.getKey(), e.getValue()))
|
|
|
|
+ ).toList();
|
|
|
|
+ return new ClientQuotaAlteration(quotaEntity, ops);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // returns empty map if no quotas found for an entity
|
|
private Mono<Map<String, Double>> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) {
|
|
private Mono<Map<String, Double>> findQuotas(ReactiveAdminClient ac, ClientQuotaEntity quotaEntity) {
|
|
- return ac.getClientQuotas(searchFilter(quotaEntity))
|
|
|
|
- .map(foundRecords -> Optional.ofNullable(foundRecords.get(quotaEntity)).orElse(Map.of()));
|
|
|
|
|
|
+ return ac.getClientQuotas(crateSearchFilter(quotaEntity))
|
|
|
|
+ .map(found -> Optional.ofNullable(found.get(quotaEntity)).orElse(Map.of()));
|
|
}
|
|
}
|
|
|
|
|
|
- private ClientQuotaFilter searchFilter(ClientQuotaEntity quotaEntity) {
|
|
|
|
|
|
+ private ClientQuotaFilter crateSearchFilter(ClientQuotaEntity quotaEntity) {
|
|
List<ClientQuotaFilterComponent> filters = new ArrayList<>();
|
|
List<ClientQuotaFilterComponent> filters = new ArrayList<>();
|
|
quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name)));
|
|
quotaEntity.entries().forEach((type, name) -> filters.add(ClientQuotaFilterComponent.ofEntity(type, name)));
|
|
return ClientQuotaFilter.contains(filters);
|
|
return ClientQuotaFilter.contains(filters);
|