diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java index 0678e414c3..d1e90a545e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.BrokersApi; import com.provectus.kafka.ui.model.Broker; +import com.provectus.kafka.ui.model.BrokerConfig; import com.provectus.kafka.ui.model.BrokerMetrics; import com.provectus.kafka.ui.model.BrokersLogdirs; import com.provectus.kafka.ui.service.ClusterService; @@ -41,4 +42,13 @@ public class BrokersController implements BrokersApi { ) { return Mono.just(ResponseEntity.ok(clusterService.getAllBrokersLogdirs(clusterName, brokers))); } + + @Override + public Mono>> getBrokerConfig(String clusterName, Integer id, + ServerWebExchange exchange) { + return clusterService.getBrokerConfig(clusterName, id) + .map(Flux::fromIterable) + .map(ResponseEntity::ok) + .onErrorReturn(ResponseEntity.notFound().build()); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 321a6c0871..380fe7d853 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -1,6 +1,7 @@ package com.provectus.kafka.ui.mapper; import com.provectus.kafka.ui.config.ClustersProperties; +import com.provectus.kafka.ui.model.BrokerConfig; import com.provectus.kafka.ui.model.BrokerDiskUsage; import com.provectus.kafka.ui.model.BrokerMetrics; import com.provectus.kafka.ui.model.Cluster; @@ -8,8 +9,11 @@ import com.provectus.kafka.ui.model.ClusterMetrics; import com.provectus.kafka.ui.model.ClusterStats; import com.provectus.kafka.ui.model.CompatibilityCheckResponse; import com.provectus.kafka.ui.model.CompatibilityLevel; +import com.provectus.kafka.ui.model.ConfigSource; +import com.provectus.kafka.ui.model.ConfigSynonym; import com.provectus.kafka.ui.model.Connect; import com.provectus.kafka.ui.model.Feature; +import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; @@ -33,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.ConfigEntry; import org.mapstruct.Mapper; import org.mapstruct.Mapping; @@ -62,6 +67,25 @@ public interface ClusterMapper { BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics); + @Mapping(target = "isSensitive", source = "sensitive") + @Mapping(target = "isReadOnly", source = "readOnly") + BrokerConfig toBrokerConfig(InternalBrokerConfig config); + + default ConfigSynonym toConfigSynonym(ConfigEntry.ConfigSynonym config) { + if (config == null) { + return null; + } + + ConfigSynonym configSynonym = new ConfigSynonym(); + configSynonym.setName(config.name()); + configSynonym.setValue(config.value()); + if (config.source() != null) { + configSynonym.setSource(ConfigSource.valueOf(config.source().name())); + } + + return configSynonym; + } + Topic toTopic(InternalTopic topic); Partition toPartition(InternalPartition topic); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java new file mode 100644 index 0000000000..b159b39696 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalBrokerConfig.java @@ -0,0 +1,18 @@ +package com.provectus.kafka.ui.model; + + +import java.util.List; +import lombok.Builder; +import lombok.Data; +import org.apache.kafka.clients.admin.ConfigEntry; + +@Data +@Builder +public class InternalBrokerConfig { + private final String name; + private final String value; + private final ConfigEntry.ConfigSource source; + private final boolean isSensitive; + private final boolean isReadOnly; + private final List synonyms; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java index 84b13767b5..4bfbea56fa 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ClusterService.java @@ -8,6 +8,7 @@ import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.mapper.ClusterMapper; import com.provectus.kafka.ui.mapper.DescribeLogDirsMapper; import com.provectus.kafka.ui.model.Broker; +import com.provectus.kafka.ui.model.BrokerConfig; import com.provectus.kafka.ui.model.BrokerMetrics; import com.provectus.kafka.ui.model.BrokersLogdirs; import com.provectus.kafka.ui.model.Cluster; @@ -223,6 +224,13 @@ public class ClusterService { .flatMapMany(Flux::fromIterable); } + public Mono> getBrokerConfig(String clusterName, Integer brokerId) { + return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName)) + .switchIfEmpty(Mono.error(ClusterNotFoundException::new)) + .flatMap(c -> kafkaService.getBrokerConfigs(c, brokerId)) + .map(c -> c.stream().map(clusterMapper::toBrokerConfig).collect(Collectors.toList())); + } + @SneakyThrows public Mono updateTopic(String clusterName, String topicName, Mono topicUpdate) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java index ed7cd4e246..bfe0e7a829 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KafkaService.java @@ -1,10 +1,13 @@ package com.provectus.kafka.ui.service; +import com.provectus.kafka.ui.exception.IllegalEntityStateException; +import com.provectus.kafka.ui.exception.NotFoundException; import com.provectus.kafka.ui.exception.TopicMetadataException; import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.CleanupPolicy; import com.provectus.kafka.ui.model.CreateTopicMessage; import com.provectus.kafka.ui.model.ExtendedAdminClient; +import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalBrokerMetrics; import com.provectus.kafka.ui.model.InternalClusterMetrics; @@ -54,6 +57,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.admin.DescribeConfigsOptions; import org.apache.kafka.clients.admin.DescribeLogDirsResult; import org.apache.kafka.clients.admin.ListTopicsOptions; import org.apache.kafka.clients.admin.NewPartitionReassignment; @@ -328,17 +332,47 @@ public class KafkaService { return ClusterUtil.toMono(adminClient.describeConfigs(resources).all()) .map(configs -> - configs.entrySet().stream().map( - c -> Tuples.of( - c.getKey().name(), - c.getValue().entries().stream().map(ClusterUtil::mapToInternalTopicConfig) - .collect(Collectors.toList()) - ) - ).collect(Collectors.toMap( - Tuple2::getT1, - Tuple2::getT2 - )) - ); + configs.entrySet().stream().collect(Collectors.toMap( + c -> c.getKey().name(), + c -> c.getValue().entries().stream() + .map(ClusterUtil::mapToInternalTopicConfig) + .collect(Collectors.toList())))); + } + + private Mono>> loadBrokersConfig( + AdminClient adminClient, List brokersIds) { + List resources = brokersIds.stream() + .map(brokerId -> new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(brokerId))) + .collect(Collectors.toList()); + + return ClusterUtil.toMono(adminClient.describeConfigs(resources, + new DescribeConfigsOptions().includeSynonyms(true)).all()) + .map(configs -> + configs.entrySet().stream().collect(Collectors.toMap( + c -> c.getKey().name(), + c -> c.getValue().entries().stream() + .map(ClusterUtil::mapToInternalBrokerConfig) + .collect(Collectors.toList())))); + } + + private Mono> loadBrokersConfig( + AdminClient adminClient, Integer brokerId) { + return loadBrokersConfig(adminClient, Collections.singletonList(brokerId)) + .map(map -> map.values().stream() + .findFirst() + .orElseThrow(() -> new IllegalEntityStateException( + String.format("Config for broker %s not found", brokerId)))); + } + + public Mono> getBrokerConfigs(KafkaCluster cluster, Integer brokerId) { + return getOrCreateAdminClient(cluster) + .flatMap(adminClient -> { + if (!cluster.getBrokers().contains(brokerId)) { + return Mono.error( + new NotFoundException(String.format("Broker with id %s not found", brokerId))); + } + return loadBrokersConfig(adminClient.getAdminClient(), brokerId); + }); } public Mono> getConsumerGroupsInternal( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java index 8248372b05..2f8d53b3b5 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java @@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.ConsumerGroupDetails; import com.provectus.kafka.ui.model.ConsumerGroupState; import com.provectus.kafka.ui.model.ConsumerGroupTopicPartition; import com.provectus.kafka.ui.model.ExtendedAdminClient; +import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalConsumerGroup; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; @@ -203,6 +204,17 @@ public class ClusterUtil { return builder.build(); } + public static InternalBrokerConfig mapToInternalBrokerConfig(ConfigEntry configEntry) { + InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder() + .name(configEntry.name()) + .value(configEntry.value()) + .source(configEntry.source()) + .isReadOnly(configEntry.isReadOnly()) + .isSensitive(configEntry.isSensitive()) + .synonyms(configEntry.synonyms()); + return builder.build(); + } + public static InternalTopic mapToInternalTopic(TopicDescription topicDescription) { var topic = InternalTopic.builder(); topic.internal( diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java index c1dfd6bd6b..80c8330419 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/KafkaConsumerTests.java @@ -1,11 +1,13 @@ package com.provectus.kafka.ui; +import com.provectus.kafka.ui.model.BrokerConfig; import com.provectus.kafka.ui.model.PartitionsIncrease; import com.provectus.kafka.ui.model.PartitionsIncreaseResponse; import com.provectus.kafka.ui.model.TopicCreation; import com.provectus.kafka.ui.model.TopicDetails; import com.provectus.kafka.ui.model.TopicMessage; import com.provectus.kafka.ui.producer.KafkaTestProducer; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.Stream; @@ -126,4 +128,40 @@ public class KafkaConsumerTests extends AbstractBaseTest { .expectStatus() .isNotFound(); } + + @Test + public void shouldReturnConfigsForBroker() { + var topicName = UUID.randomUUID().toString(); + + List configs = webTestClient.get() + .uri("/api/clusters/{clusterName}/brokers/{id}/configs", + LOCAL, + 1) + .exchange() + .expectStatus() + .isOk() + .expectBodyList(BrokerConfig.class) + .returnResult() + .getResponseBody(); + + assert configs != null; + assert !configs.isEmpty(); + Assertions.assertNotEquals(null, configs.get(0).getName()); + Assertions.assertNotEquals(null, configs.get(0).getIsReadOnly()); + Assertions.assertNotEquals(null, configs.get(0).getIsSensitive()); + Assertions.assertNotEquals(null, configs.get(0).getSource()); + } + + @Test + public void shouldReturn404ForNonExistingBroker() { + var topicName = UUID.randomUUID().toString(); + + webTestClient.get() + .uri("/api/clusters/{clusterName}/brokers/{id}/configs", + LOCAL, + 0) + .exchange() + .expectStatus() + .isNotFound(); + } } diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 9135b039ab..bc849e6350 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -77,6 +77,35 @@ paths: items: $ref: '#/components/schemas/Broker' + /api/clusters/{clusterName}/brokers/{id}/configs: + get: + tags: + - Brokers + summary: getBrokerConfig + operationId: getBrokerConfig + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: integer + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/BrokerConfig' + 404: + description: Not found + /api/clusters/{clusterName}/metrics: get: tags: @@ -2334,4 +2363,49 @@ components: type: string required: - totalReplicationFactor - - topicName \ No newline at end of file + - topicName + + BrokerConfig: + type: object + properties: + name: + type: string + value: + type: string + source: + $ref: '#/components/schemas/ConfigSource' + isSensitive: + type: boolean + isReadOnly: + type: boolean + synonyms: + type: array + items: + $ref: '#/components/schemas/ConfigSynonym' + required: + - name + - value + - source + - isSensitive + - isReadOnly + + ConfigSource: + type: string + enum: + - DYNAMIC_TOPIC_CONFIG + - DYNAMIC_BROKER_LOGGER_CONFIG + - DYNAMIC_BROKER_CONFIG + - DYNAMIC_DEFAULT_BROKER_CONFIG + - STATIC_BROKER_CONFIG + - DEFAULT_CONFIG + - UNKNOWN + + ConfigSynonym: + type: object + properties: + name: + type: string + value: + type: string + source: + $ref: '#/components/schemas/ConfigSource' \ No newline at end of file