wip
This commit is contained in:
parent
1336ec33cb
commit
26c21f733d
3 changed files with 10 additions and 10 deletions
|
@ -42,7 +42,7 @@ public class ClientQuotasController extends AbstractController implements Client
|
|||
|
||||
Mono<ResponseEntity<Flux<ClientQuotasDTO>>> operation =
|
||||
Mono.just(
|
||||
clientQuotaService.list(getCluster(clusterName))
|
||||
clientQuotaService.getAll(getCluster(clusterName))
|
||||
.sort(QUOTA_RECORDS_COMPARATOR)
|
||||
.map(this::mapToDto)
|
||||
).map(ResponseEntity::ok);
|
||||
|
|
|
@ -34,7 +34,7 @@ public class ClientQuotaService {
|
|||
|
||||
private final AdminClientService adminClientService;
|
||||
|
||||
public Flux<ClientQuotaRecord> list(KafkaCluster cluster) {
|
||||
public Flux<ClientQuotaRecord> getAll(KafkaCluster cluster) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
||||
.flatMapIterable(Map::entrySet)
|
||||
|
|
|
@ -34,7 +34,7 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest {
|
|||
nullValues = "null"
|
||||
)
|
||||
void createUpdateDelete(String user, String clientId, String ip) {
|
||||
var quotas = Map.of(
|
||||
var initialQuotas = Map.of(
|
||||
"producer_byte_rate", 123.0,
|
||||
"consumer_byte_rate", 234.0,
|
||||
"request_percentage", 10.0
|
||||
|
@ -42,22 +42,22 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest {
|
|||
|
||||
//creating new
|
||||
StepVerifier.create(
|
||||
quotaService.upsert(cluster, user, clientId, ip, quotas)
|
||||
quotaService.upsert(cluster, user, clientId, ip, initialQuotas)
|
||||
)
|
||||
.assertNext(status -> assertThat(status.value()).isEqualTo(201))
|
||||
.verifyComplete();
|
||||
|
||||
assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, quotas)))
|
||||
assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, initialQuotas)))
|
||||
.isTrue();
|
||||
|
||||
//updating
|
||||
StepVerifier.create(
|
||||
quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 111111.0))
|
||||
quotaService.upsert(cluster, user, clientId, ip, Map.of("producer_byte_rate", 22222.0))
|
||||
)
|
||||
.assertNext(status -> assertThat(status.value()).isEqualTo(200))
|
||||
.verifyComplete();
|
||||
|
||||
assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 111111.0))))
|
||||
assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0))))
|
||||
.isTrue();
|
||||
|
||||
//deleting created record
|
||||
|
@ -67,12 +67,12 @@ class ClientQuotaServiceTest extends AbstractIntegrationTest {
|
|||
.assertNext(status -> assertThat(status.value()).isEqualTo(204))
|
||||
.verifyComplete();
|
||||
|
||||
assertThat(quotaRecordExisits(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 111111.0))))
|
||||
assertThat(quotaRecordExists(new ClientQuotaRecord(user, clientId, ip, Map.of("producer_byte_rate", 22222.0))))
|
||||
.isFalse();
|
||||
}
|
||||
|
||||
private boolean quotaRecordExisits(ClientQuotaRecord rec) {
|
||||
return quotaService.list(cluster).collectList().block().contains(rec);
|
||||
private boolean quotaRecordExists(ClientQuotaRecord rec) {
|
||||
return quotaService.getAll(cluster).collectList().block().contains(rec);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue