diff --git a/documentation/compose/kafka-ui.yaml b/documentation/compose/kafka-ui.yaml index 14a269ca7c..bf55869685 100644 --- a/documentation/compose/kafka-ui.yaml +++ b/documentation/compose/kafka-ui.yaml @@ -8,146 +8,165 @@ services: ports: - 8080:8080 depends_on: - - kafka0 + - kafka2 - kafka1 - - schemaregistry0 - - schemaregistry1 - - kafka-connect0 environment: KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:9092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry0:8085 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 - KAFKA_CLUSTERS_1_NAME: secondLocal - KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: kafka1:29092 - KAFKA_CLUSTERS_1_METRICS_PORT: 9998 - KAFKA_CLUSTERS_1_SCHEMAREGISTRY: http://schemaregistry1:8085 - DYNAMIC_CONFIG_ENABLED: 'true' - - kafka0: - image: confluentinc/cp-kafka:7.2.1 - hostname: kafka0 - container_name: kafka0 - ports: - - "9092:9092" - - "9997:9997" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9997 - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 - KAFKA_PROCESS_ROLES: 'broker,controller' - KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' - KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' - KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' - volumes: - - ./scripts/update_run.sh:/tmp/update_run.sh - command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" kafka1: image: confluentinc/cp-kafka:7.2.1 - hostname: kafka1 container_name: kafka1 - ports: - - "9093:9092" - - "9998:9998" environment: - KAFKA_BROKER_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_JMX_PORT: 9998 - KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998 - KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_NODE_ID: 1 - KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093' - KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092 KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' - KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' volumes: - - ./scripts/update_run.sh:/tmp/update_run.sh + - ./scripts/update_run_cluster.sh:/tmp/update_run.sh command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" - schemaregistry0: - image: confluentinc/cp-schema-registry:7.2.1 - ports: - - 8085:8085 - depends_on: - - kafka0 - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 - SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT - SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 - SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 - - SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" - SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO - SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas - - schemaregistry1: - image: confluentinc/cp-schema-registry:7.2.1 - ports: - - 18085:8085 - depends_on: - - kafka1 - environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 - SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT - SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 - SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 - - SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" - SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO - SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas - - kafka-connect0: - image: confluentinc/cp-kafka-connect:7.2.1 - ports: - - 8083:8083 - depends_on: - - kafka0 - - schemaregistry0 - environment: - CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 - CONNECT_GROUP_ID: compose-connect-group - CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_STATUS_STORAGE_TOPIC: _connect_status - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 - CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 - CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter - CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 - CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" - - kafka-init-topics: + kafka2: image: confluentinc/cp-kafka:7.2.1 + container_name: kafka2 + environment: + KAFKA_NODE_ID: 2 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://kafka2:9092,CONTROLLER://kafka2:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093' + KAFKA_PROCESS_ROLES: 'broker,controller' volumes: - - ./data/message.json:/data/message.json - depends_on: - - kafka1 - command: "bash -c 'echo Waiting for Kafka to be ready... && \ - cub kafka-ready -b kafka1:29092 1 30 && \ - kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ - kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ - kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ - kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" + - ./scripts/update_run_cluster.sh:/tmp/update_run.sh + command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" + +# kafka0: +# image: confluentinc/cp-kafka:7.2.1 +# hostname: kafka0 +# container_name: kafka0 +# ports: +# - "9092:9092" +# - "9997:9997" +# environment: +# KAFKA_BROKER_ID: 1 +# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' +# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092' +# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 +# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 +# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 +# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 +# KAFKA_JMX_PORT: 9997 +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 +# KAFKA_PROCESS_ROLES: 'broker,controller' +# KAFKA_NODE_ID: 1 +# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093' +# KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092' +# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' +# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' +# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' +# volumes: +# - ./scripts/update_run.sh:/tmp/update_run.sh +# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" +# +# kafka1: +# image: confluentinc/cp-kafka:7.2.1 +# hostname: kafka1 +# container_name: kafka1 +# ports: +# - "9093:9092" +# - "9998:9998" +# environment: +# KAFKA_BROKER_ID: 1 +# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' +# KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://localhost:9092' +# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 +# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 +# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 +# KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 +# KAFKA_JMX_PORT: 9998 +# KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9998 +# KAFKA_PROCESS_ROLES: 'broker,controller' +# KAFKA_NODE_ID: 1 +# KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093' +# KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092' +# KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' +# KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' +# KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' +# volumes: +# - ./scripts/update_run.sh:/tmp/update_run.sh +# command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'" + +# schemaregistry0: +# image: confluentinc/cp-schema-registry:7.2.1 +# ports: +# - 8085:8085 +# depends_on: +# - kafka0 +# environment: +# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka0:29092 +# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT +# SCHEMA_REGISTRY_HOST_NAME: schemaregistry0 +# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry0:8085 +# +# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" +# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO +# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# +# schemaregistry1: +# image: confluentinc/cp-schema-registry:7.2.1 +# ports: +# - 18085:8085 +# depends_on: +# - kafka1 +# environment: +# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092 +# SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT +# SCHEMA_REGISTRY_HOST_NAME: schemaregistry1 +# SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085 +# +# SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http" +# SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO +# SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas +# +# kafka-connect0: +# image: confluentinc/cp-kafka-connect:7.2.1 +# ports: +# - 8083:8083 +# depends_on: +# - kafka0 +# - schemaregistry0 +# environment: +# CONNECT_BOOTSTRAP_SERVERS: kafka0:29092 +# CONNECT_GROUP_ID: compose-connect-group +# CONNECT_CONFIG_STORAGE_TOPIC: _connect_configs +# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 +# CONNECT_OFFSET_STORAGE_TOPIC: _connect_offset +# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 +# CONNECT_STATUS_STORAGE_TOPIC: _connect_status +# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 +# CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter +# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 +# CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter +# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry0:8085 +# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter +# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter +# CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect0 +# CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" +# +# kafka-init-topics: +# image: confluentinc/cp-kafka:7.2.1 +# volumes: +# - ./data/message.json:/data/message.json +# depends_on: +# - kafka1 +# command: "bash -c 'echo Waiting for Kafka to be ready... && \ +# cub kafka-ready -b kafka1:29092 1 30 && \ +# kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ +# kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ +# kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ +# kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java index 8d0eafeff3..a7f0db0437 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/KafkaUiApplication.java @@ -1,6 +1,9 @@ package com.provectus.kafka.ui; import com.provectus.kafka.ui.util.DynamicConfigOperations; +import java.util.Map; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -14,7 +17,10 @@ import org.springframework.scheduling.annotation.EnableScheduling; public class KafkaUiApplication { public static void main(String[] args) { - startApplication(args); + AdminClient ac = AdminClient.create(Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" + )); + System.out.println(ac); } public static ConfigurableApplicationContext startApplication(String[] args) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java new file mode 100644 index 0000000000..709e82bf94 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClientQuotasController.java @@ -0,0 +1,68 @@ +package com.provectus.kafka.ui.controller; + +import static java.util.stream.Collectors.toMap; + +import com.provectus.kafka.ui.api.ClientQuotasApi; +import com.provectus.kafka.ui.model.ClientQuotasDTO; +import com.provectus.kafka.ui.service.audit.AuditService; +import com.provectus.kafka.ui.service.quota.ClientQuotaRecord; +import com.provectus.kafka.ui.service.quota.QuotaService; +import com.provectus.kafka.ui.service.rbac.AccessControlService; +import java.math.BigDecimal; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequiredArgsConstructor +public class ClientQuotasController extends AbstractController implements ClientQuotasApi { + + private final QuotaService quotaService; + private final AccessControlService accessControlService; + private final AuditService auditService; + + @Override + public Mono>> listQuotas(String clusterName, + ServerWebExchange exchange) { + return Mono.just(quotaService.all(getCluster(clusterName)).map(this::map)) + .map(ResponseEntity::ok); + } + + @Override + public Mono> upsertClientQuotas(String clusterName, + Mono clientQuotasDTO, + ServerWebExchange exchange) { + + return clientQuotasDTO.flatMap( + quotas -> + quotaService.upsert( + getCluster(clusterName), + quotas.getUser(), + quotas.getClientId(), + quotas.getIp(), + Optional.ofNullable(quotas.getQuotas()).orElse(Map.of()) + .entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().doubleValue())) + ) + ).map(statusCode -> ResponseEntity.status(statusCode).build()); + } + + private ClientQuotasDTO map(ClientQuotaRecord quotaRecord) { + return new ClientQuotasDTO() + .user(quotaRecord.user()) + .clientId(quotaRecord.clientId()) + .ip(quotaRecord.ip()) + .quotas( + quotaRecord.quotas().entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> BigDecimal.valueOf(e.getValue()))) + ); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java index 2973e5500d..fec21b40db 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/ClusterFeature.java @@ -6,5 +6,6 @@ public enum ClusterFeature { SCHEMA_REGISTRY, TOPIC_DELETION, KAFKA_ACL_VIEW, - KAFKA_ACL_EDIT + KAFKA_ACL_EDIT, + CLIENT_QUOTA_MANAGEMENT } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java index b08691aef5..f51054b5c5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/FeatureService.java @@ -1,5 +1,7 @@ package com.provectus.kafka.ui.service; +import static com.provectus.kafka.ui.service.ReactiveAdminClient.SupportedFeature.CLIENT_QUOTA_MANAGEMENT; + import com.provectus.kafka.ui.model.ClusterFeature; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.ReactiveAdminClient.ClusterDescription; @@ -41,6 +43,7 @@ public class FeatureService { features.add(topicDeletionEnabled(adminClient)); features.add(aclView(adminClient)); features.add(aclEdit(adminClient, clusterDescription)); + features.add(quotaManagement(adminClient)); return Flux.fromIterable(features).flatMap(m -> m).collectList(); } @@ -70,4 +73,10 @@ public class FeatureService { return adminClient.getClusterFeatures().contains(ReactiveAdminClient.SupportedFeature.AUTHORIZED_SECURITY_ENABLED); } + private Mono quotaManagement(ReactiveAdminClient adminClient) { + return adminClient.getClusterFeatures().contains(CLIENT_QUOTA_MANAGEMENT) + ? Mono.just(ClusterFeature.CLIENT_QUOTA_MANAGEMENT) + : Mono.empty(); + } + } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 9de908efa7..70c1e3ff1c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -77,6 +77,9 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; @@ -94,7 +97,8 @@ public class ReactiveAdminClient implements Closeable { INCREMENTAL_ALTER_CONFIGS(2.3f), CONFIG_DOCUMENTATION_RETRIEVAL(2.6f), DESCRIBE_CLUSTER_INCLUDE_AUTHORIZED_OPERATIONS(2.3f), - AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled); + AUTHORIZED_SECURITY_ENABLED(ReactiveAdminClient::isAuthorizedSecurityEnabled), + CLIENT_QUOTA_MANAGEMENT(2.6f); private final BiFunction> predicate; @@ -658,6 +662,14 @@ public class ReactiveAdminClient implements Closeable { return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } + public Mono>> getClientQuotas(ClientQuotaFilter filter) { + return toMono(client.describeClientQuotas(filter).entities()); + } + + public Mono alterClientQuota(ClientQuotaAlteration alteration) { + return toMono(client.alterClientQuotas(List.of(alteration)).all()); + } + private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java new file mode 100644 index 0000000000..8dc0e49e45 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/ClientQuotaRecord.java @@ -0,0 +1,20 @@ +package com.provectus.kafka.ui.service.quota; + +import jakarta.annotation.Nullable; +import java.util.Map; +import org.apache.kafka.common.quota.ClientQuotaEntity; + +public record ClientQuotaRecord(@Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map quotas) { + + static ClientQuotaRecord create(ClientQuotaEntity entity, Map qoutas) { + return new ClientQuotaRecord( + entity.entries().get(ClientQuotaEntity.USER), + entity.entries().get(ClientQuotaEntity.CLIENT_ID), + entity.entries().get(ClientQuotaEntity.IP), + qoutas + ); + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java new file mode 100644 index 0000000000..3ff8ac5f69 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/quota/QuotaService.java @@ -0,0 +1,39 @@ +package com.provectus.kafka.ui.service.quota; + +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.AdminClientService; +import jakarta.annotation.Nullable; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; +import org.springframework.http.HttpStatusCode; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +@RequiredArgsConstructor +public class QuotaService { + + private final AdminClientService adminClientService; + + public Flux all(KafkaCluster cluster) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.getClientQuotas(ClientQuotaFilter.all())) + .flatMapIterable(map -> + map.entrySet().stream().map(e -> ClientQuotaRecord.create(e.getKey(), e.getValue())).toList()); + } + + //returns 201 is new entity was created, 204 if exsiting was updated + public Mono upsert(KafkaCluster cluster, + @Nullable String user, + @Nullable String clientId, + @Nullable String ip, + Map quotas) { + + } +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 9484948b67..8f68bb049f 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1910,6 +1910,50 @@ paths: 200: description: OK + /api/clusters/{clusterName}/clientquotas: + get: + tags: + - ClientQuotas + summary: listQuotas + operationId: listQuotas + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ClientQuotas' + post: + tags: + - ClientQuotas + summary: upsertClientQuotas + operationId: upsertClientQuotas + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ClientQuotas' + responses: + 201: + description: Quota created + 204: + description: Existing quota updated + + /api/clusters/{clusterName}/acl/streamApp: post: tags: @@ -2175,6 +2219,7 @@ components: - TOPIC_DELETION - KAFKA_ACL_VIEW # get ACLs listing - KAFKA_ACL_EDIT # create & delete ACLs + - CLIENT_QUOTA_MANAGEMENT required: - id - name @@ -3701,6 +3746,20 @@ components: nullable: false type: string + ClientQuotas: + type: object + properties: + user: + type: string + clientId: + type: string + ip: + type: string + quotas: + type: object + additionalProperties: + type: number + KafkaAclResourceType: type: string enum: