wip
This commit is contained in:
parent
551357207e
commit
8e50d59a44
3 changed files with 53 additions and 3 deletions
|
@ -4,6 +4,7 @@ import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID;
|
||||||
import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
|
import static org.apache.kafka.common.quota.ClientQuotaEntity.IP;
|
||||||
import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
|
import static org.apache.kafka.common.quota.ClientQuotaEntity.USER;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.provectus.kafka.ui.exception.ValidationException;
|
import com.provectus.kafka.ui.exception.ValidationException;
|
||||||
import com.provectus.kafka.ui.model.KafkaCluster;
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
|
@ -38,8 +39,11 @@ public class ClientQuotaService {
|
||||||
return adminClientService.get(cluster)
|
return adminClientService.get(cluster)
|
||||||
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
|
||||||
.flatMapIterable(map ->
|
.flatMapIterable(map ->
|
||||||
map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList())
|
map.entrySet().stream()
|
||||||
.sort(ClientQuotaRecord.COMPARATOR);
|
.map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue()))
|
||||||
|
.sorted(ClientQuotaRecord.COMPARATOR)
|
||||||
|
.toList()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
//returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted
|
//returns 201 if new entity was created, 200 if existing was updated, 204 if existing was deleted
|
||||||
|
@ -66,7 +70,8 @@ public class ClientQuotaService {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) {
|
@VisibleForTesting
|
||||||
|
static ClientQuotaEntity quotaEntity(@Nullable String user, @Nullable String clientId, @Nullable String ip) {
|
||||||
if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) {
|
if (Stream.of(user, clientId, ip).allMatch(Objects::isNull)) {
|
||||||
throw new ValidationException("Quota entity id is not set");
|
throw new ValidationException("Quota entity id is not set");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,55 @@
|
||||||
package com.provectus.kafka.ui.service.quota;
|
package com.provectus.kafka.ui.service.quota;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
import com.provectus.kafka.ui.AbstractIntegrationTest;
|
||||||
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
||||||
|
import com.provectus.kafka.ui.service.ClustersStorage;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
|
import org.junit.jupiter.params.provider.CsvSource;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
class ClientQuotaServiceTest extends AbstractIntegrationTest {
|
class ClientQuotaServiceTest extends AbstractIntegrationTest {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
ClientQuotaService quotaService;
|
ClientQuotaService quotaService;
|
||||||
|
|
||||||
|
private KafkaCluster cluster;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void init() {
|
||||||
|
cluster = applicationContext.getBean(ClustersStorage.class).getClusterByName(LOCAL).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@CsvSource(
|
||||||
|
value = {
|
||||||
|
"testUser, null, null ",
|
||||||
|
"null, testUserId, null",
|
||||||
|
"testUser2, testUserId2, null",
|
||||||
|
"null, null, 127.0.0.1"
|
||||||
|
},
|
||||||
|
nullValues = "null"
|
||||||
|
)
|
||||||
|
void createsQuotaRecord(String user, String clientId, String ip) {
|
||||||
|
StepVerifier.create(
|
||||||
|
quotaService.upsert(
|
||||||
|
cluster,
|
||||||
|
user,
|
||||||
|
clientId,
|
||||||
|
ip,
|
||||||
|
Map.of(
|
||||||
|
"producer_byte_rate", 123.0,
|
||||||
|
"consumer_byte_rate", 234.0,
|
||||||
|
"request_percentage", 10.0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.assertNext(status -> assertThat(status.value()).isEqualTo(201))
|
||||||
|
.verifyComplete();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3650,6 +3650,7 @@ components:
|
||||||
- KSQL
|
- KSQL
|
||||||
- ACL
|
- ACL
|
||||||
- AUDIT
|
- AUDIT
|
||||||
|
- CLIENT_QUOTAS
|
||||||
|
|
||||||
KafkaAcl:
|
KafkaAcl:
|
||||||
type: object
|
type: object
|
||||||
|
|
Loading…
Add table
Reference in a new issue