This commit is contained in:
iliax 2023-08-11 14:38:44 +04:00
parent d915de4fd8
commit 3d3845a23b
9 changed files with 363 additions and 130 deletions

View file

@ -8,146 +8,165 @@ services:
ports:
- 8080:8080
depends_on:
- kafka0
- kafka2
- kafka1
- schemaregistry0
- schemaregistry1
- kafka-connect0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
KAFKA_CLUSTERS_1_NAME: secondLocal
KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_1_METRICS_PORT: 9998
KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085
DYNAMIC_CONFIG_ENABLED: 'true'
kafka0:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka0
container_name: kafka0
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- ./scripts/update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
kafka1:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka1
container_name: kafka1
ports:
- "9093:9092"
- "9998:9998"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./scripts/update_run_cluster.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
schemaregistry0:
image: confluentinc/cp-schema-registry:7.2.1
ports:
- 8085:8085
depends_on:
- kafka0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
schemaregistry1:
image: confluentinc/cp-schema-registry:7.2.1
ports:
- 18085:8085
depends_on:
- kafka1
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
kafka-connect0:
image: confluentinc/cp-kafka-connect:7.2.1
ports:
- 8083:8083
depends_on:
- kafka0
- schemaregistry0
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: _connect_status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
kafka-init-topics:
kafka2:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka2
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
volumes:
- ./data/message.json:/data/message.json
depends_on:
- kafka1
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:29092 1 30 && \
kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \
kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'"
- ./scripts/update_run_cluster.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
# kafka0:
# image: confluentinc/cp-kafka:7.2.1
# hostname: kafka0
# container_name: kafka0
# ports:
# - "9092:9092"
# - "9997:9997"
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092'
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
# KAFKA_JMX_PORT: 9997
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
# KAFKA_PROCESS_ROLES: 'broker,controller'
# KAFKA_NODE_ID: 1
# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
# KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# volumes:
# - ./scripts/update_run.sh:/tmp/update_run.sh
# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
#
# kafka1:
# image: confluentinc/cp-kafka:7.2.1
# hostname: kafka1
# container_name: kafka1
# ports:
# - "9093:9092"
# - "9998:9998"
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092'
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
# KAFKA_JMX_PORT: 9998
# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998
# KAFKA_PROCESS_ROLES: 'broker,controller'
# KAFKA_NODE_ID: 1
# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
# KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# volumes:
# - ./scripts/update_run.sh:/tmp/update_run.sh
# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"
# schemaregistry0:
# image: confluentinc/cp-schema-registry:7.2.1
# ports:
# - 8085:8085
# depends_on:
# - kafka0
# environment:
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
# SCHEMA_REGISTRY_HOST_NAME: schemaregistry0
# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085
#
# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
#
# schemaregistry1:
# image: confluentinc/cp-schema-registry:7.2.1
# ports:
# - 18085:8085
# depends_on:
# - kafka1
# environment:
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
# SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085
#
# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas
#
# kafka-connect0:
# image: confluentinc/cp-kafka-connect:7.2.1
# ports:
# - 8083:8083
# depends_on:
# - kafka0
# - schemaregistry0
# environment:
# CONNECT_BOOTSTRAP_SERVERS: kafka0:29092
# CONNECT_GROUP_ID: compose-connect-group
# CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_STATUS_STORAGE_TOPIC: _connect_status
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
# CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
# CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085
# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0
# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
#
# kafka-init-topics:
# image: confluentinc/cp-kafka:7.2.1
# volumes:
# - ./data/message.json:/data/message.json
# depends_on:
# - kafka1
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
# cub kafka-ready -b kafka1:29092 1 30 && \
# kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
# kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
# kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \
# kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'"

View file

@ -1,6 +1,9 @@
package com.provectus.kafka.ui;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import java.util.Map;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@ -14,7 +17,10 @@ import org.springframework.scheduling.annotation.EnableScheduling;
public class KafkaUiApplication {
public static void main(String[] args) {
startApplication(args);
AdminClient ac = AdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"
));
System.out.println(ac);
}
public static ConfigurableApplicationContext startApplication(String[] args) {

View file

@ -0,0 +1,68 @@
package com.provectus.kafka.ui.controller;
import static java.util.stream.Collectors.toMap;
import com.provectus.kafka.ui.api.ClientQuotasApi;
import com.provectus.kafka.ui.model.ClientQuotasDTO;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.quota.ClientQuotaRecord;
import com.provectus.kafka.ui.service.quota.QuotaService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.math.BigDecimal;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
public class ClientQuotasController extends AbstractController implements ClientQuotasApi {
private final QuotaService quotaService;
private final AccessControlService accessControlService;
private final AuditService auditService;
@Override
public Mono<ResponseEntity<Flux<ClientQuotasDTO>>> listQuotas(String clusterName,
ServerWebExchange exchange) {
return Mono.just(quotaService.all(getCluster(clusterName)).map(this::map))
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Void>> upsertClientQuotas(String clusterName,
Mono<ClientQuotasDTO> clientQuotasDTO,
ServerWebExchange exchange) {
return clientQuotasDTO.flatMap(
quotas ->
quotaService.upsert(
getCluster(clusterName),
quotas.getUser(),
quotas.getClientId(),
quotas.getIp(),
Optional.ofNullable(quotas.getQuotas()).orElse(Map.of())
.entrySet()
.stream()
.collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue()))
)
).map(statusCode -> ResponseEntity.status(statusCode).build());
}
private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) {
return new ClientQuotasDTO()
.user(quotaRecord.user())
.clientId(quotaRecord.clientId())
.ip(quotaRecord.ip())
.quotas(
quotaRecord.quotas().entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> BigDecimal.valueOf(e.getValue())))
);
}
}

View file

@ -6,5 +6,6 @@ public enum ClusterFeature {
SCHEMA_REGISTRY,
TOPIC_DELETION,
KAFKA_ACL_VIEW,
KAFKA_ACL_EDIT
KAFKA_ACL_EDIT,
CLIENT_QUOTA_MANAGEMENT
}

View file

@ -1,5 +1,7 @@
package com.provectus.kafka.ui.service;
import static com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT;
import com.provectus.kafka.ui.model.ClusterFeature;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription;
@ -41,6 +43,7 @@ public class FeatureService {
features.add(topicDeletionEnabled(adminClient));
features.add(aclView(adminClient));
features.add(aclEdit(adminClient, clusterDescription));
features.add(quotaManagement(adminClient));
return Flux.fromIterable(features).flatMap(m -> m).collectList();
}
@ -70,4 +73,10 @@ public class FeatureService {
return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED);
}
private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT)
? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT)
: Mono.empty();
}
}

View file

@ -77,6 +77,9 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
@ -94,7 +97,8 @@ public class ReactiveAdminClient implements Closeable {
INCREMENTAL_ALTER_CONFIGS(2.3f),
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f),
DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f),
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled);
AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled),
CLIENT_QUOTA_MANAGEMENT(2.6f);
private final BiFunction<AdminClient, Float, Mono<Boolean>> predicate;
@ -658,6 +662,14 @@ public class ReactiveAdminClient implements Closeable {
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
}
public Mono<Map<ClientQuotaEntity, Map<String, Double>>> getClientQuotas(ClientQuotaFilter filter) {
return toMono(client.describeClientQuotas(filter).entities());
}
public Mono<Void> alterClientQuota(ClientQuotaAlteration alteration) {
return toMono(client.alterClientQuotas(List.of(alteration)).all());
}
private Mono<Void> incrementalAlterConfig(String topicName,
List<ConfigEntry> currentConfigs,
Map<String, String> newConfigs) {

View file

@ -0,0 +1,20 @@
package com.provectus.kafka.ui.service.quota;
import jakarta.annotation.Nullable;
import java.util.Map;
import org.apache.kafka.common.quota.ClientQuotaEntity;
public record ClientQuotaRecord(@Nullable String user,
@Nullable String clientId,
@Nullable String ip,
Map<String, Double> quotas) {
static ClientQuotaRecord create(ClientQuotaEntity entity, Map<String, Double> qoutas) {
return new ClientQuotaRecord(
entity.entries().get(ClientQuotaEntity.USER),
entity.entries().get(ClientQuotaEntity.CLIENT_ID),
entity.entries().get(ClientQuotaEntity.IP),
qoutas
);
}
}

View file

@ -0,0 +1,39 @@
package com.provectus.kafka.ui.service.quota;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.AdminClientService;
import jakarta.annotation.Nullable;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.springframework.http.HttpStatusCode;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
public class QuotaService {
private final AdminClientService adminClientService;
public Flux<ClientQuotaRecord> all(KafkaCluster cluster) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all()))
.flatMapIterable(map ->
map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList());
}
//returns 201 is new entity was created, 204 if exsiting was updated
public Mono<HttpStatusCode> upsert(KafkaCluster cluster,
@Nullable String user,
@Nullable String clientId,
@Nullable String ip,
Map<String, Double> quotas) {
}
}

View file

@ -1910,6 +1910,50 @@ paths:
200:
description: OK
/api/clusters/{clusterName}/clientquotas:
get:
tags:
- ClientQuotas
summary: listQuotas
operationId: listQuotas
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
responses:
200:
description: OK
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/ClientQuotas'
post:
tags:
- ClientQuotas
summary: upsertClientQuotas
operationId: upsertClientQuotas
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ClientQuotas'
responses:
201:
description: Quota created
204:
description: Existing quota updated
/api/clusters/{clusterName}/acl/streamApp:
post:
tags:
@ -2175,6 +2219,7 @@ components:
- TOPIC_DELETION
- KAFKA_ACL_VIEW # get ACLs listing
- KAFKA_ACL_EDIT # create & delete ACLs
- CLIENT_QUOTA_MANAGEMENT
required:
- id
- name
@ -3701,6 +3746,20 @@ components:
nullable: false
type: string
ClientQuotas:
type: object
properties:
user:
type: string
clientId:
type: string
ip:
type: string
quotas:
type: object
additionalProperties:
type: number
KafkaAclResourceType:
type: string
enum: