diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java index 294894ebc2..d061dd4981 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalTopicConfig.java @@ -1,8 +1,5 @@ package com.provectus.kafka.ui.model; -import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG; - import java.util.List; import lombok.Builder; import lombok.Data; @@ -19,6 +16,7 @@ public class InternalTopicConfig { private final boolean isSensitive; private final boolean isReadOnly; private final List synonyms; + private final String doc; public static InternalTopicConfig from(ConfigEntry configEntry) { InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder() @@ -27,11 +25,22 @@ public class InternalTopicConfig { .source(configEntry.source()) .isReadOnly(configEntry.isReadOnly()) .isSensitive(configEntry.isSensitive()) - .synonyms(configEntry.synonyms()); - if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) { + .synonyms(configEntry.synonyms()) + .doc(configEntry.documentation()); + + if (configEntry.source() == ConfigEntry.ConfigSource.DEFAULT_CONFIG) { + // this is important case, because for some configs like "confluent.*" no synonyms returned, but + // they are set by default and "source" == DEFAULT_CONFIG builder.defaultValue(configEntry.value()); } else { - builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name())); + // normally by default first entity of synonyms values will be used. + configEntry.synonyms().stream() + // skipping DYNAMIC_TOPIC_CONFIG value - which is explicitly set value when + // topic was created (not default), see ConfigEntry.synonyms() doc + .filter(s -> s.source() != ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) + .map(ConfigEntry.ConfigSynonym::value) + .findFirst() + .ifPresent(builder::defaultValue); } return builder.build(); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index c08b839410..3ef654025d 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -145,20 +145,20 @@ public class ReactiveAdminClient implements Closeable { } public Mono>> getTopicsConfig() { - return listTopics(true).flatMap(this::getTopicsConfig); + return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false)); } - public Mono>> getTopicsConfig(Collection topicNames) { + public Mono>> getTopicsConfig(Collection topicNames, boolean includeDoc) { // we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count return partitionCalls( topicNames, 200, - this::getTopicsConfigImpl, + part -> getTopicsConfigImpl(part, includeDoc), (m1, m2) -> ImmutableMap.>builder().putAll(m1).putAll(m2).build() ); } - private Mono>> getTopicsConfigImpl(Collection topicNames) { + private Mono>> getTopicsConfigImpl(Collection topicNames, boolean includeDoc) { List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(toList()); @@ -166,7 +166,7 @@ public class ReactiveAdminClient implements Closeable { return toMonoWithExceptionFilter( client.describeConfigs( resources, - new DescribeConfigsOptions().includeSynonyms(true)).values(), + new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(), UnknownTopicOrPartitionException.class ).map(config -> config.entrySet().stream() .collect(toMap( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 2dffe0de75..8badcebc36 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -68,7 +68,7 @@ public class TopicsService { } return adminClientService.get(c) .flatMap(ac -> - ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics), + ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false), (descriptions, configs) -> { statisticsCache.update(c, descriptions, configs); return getPartitionOffsets(descriptions, ac).map(offsets -> { @@ -160,7 +160,7 @@ public class TopicsService { public Mono> getTopicConfigs(KafkaCluster cluster, String topicName) { return adminClientService.get(cluster) - .flatMap(ac -> ac.getTopicsConfig(List.of(topicName))) + .flatMap(ac -> ac.getTopicsConfig(List.of(topicName), true)) .map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new)); } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/ClusterUtil.java deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java deleted file mode 100644 index aa482c57b5..0000000000 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/util/KafkaConstants.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.provectus.kafka.ui.util; - -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE; -import static org.apache.kafka.common.config.TopicConfig.COMPRESSION_TYPE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.FILE_DELETE_DELAY_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.FLUSH_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.INDEX_INTERVAL_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MAX_MESSAGE_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.PREALLOCATE_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_INDEX_BYTES_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_JITTER_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG; -import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG; - -import java.util.AbstractMap; -import java.util.Map; - -public final class KafkaConstants { - - private static final String LONG_MAX_STRING = Long.toString(Long.MAX_VALUE); - - public static final Map TOPIC_DEFAULT_CONFIGS = Map.ofEntries( - new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE), - new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"), - new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"), - new AbstractMap.SimpleEntry<>(FILE_DELETE_DELAY_MS_CONFIG, "60000"), - new AbstractMap.SimpleEntry<>(FLUSH_MESSAGES_INTERVAL_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>(FLUSH_MS_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>("follower.replication.throttled.replicas", ""), - new AbstractMap.SimpleEntry<>(INDEX_INTERVAL_BYTES_CONFIG, "4096"), - new AbstractMap.SimpleEntry<>("leader.replication.throttled.replicas", ""), - new AbstractMap.SimpleEntry<>(MAX_COMPACTION_LAG_MS_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>(MAX_MESSAGE_BYTES_CONFIG, "1000012"), - new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG_MAX_STRING), - new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"), - new AbstractMap.SimpleEntry<>(MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"), - new AbstractMap.SimpleEntry<>(MIN_COMPACTION_LAG_MS_CONFIG, "0"), - new AbstractMap.SimpleEntry<>(MIN_IN_SYNC_REPLICAS_CONFIG, "1"), - new AbstractMap.SimpleEntry<>(PREALLOCATE_CONFIG, "false"), - new AbstractMap.SimpleEntry<>(RETENTION_BYTES_CONFIG, "-1"), - new AbstractMap.SimpleEntry<>(RETENTION_MS_CONFIG, "604800000"), - new AbstractMap.SimpleEntry<>(SEGMENT_BYTES_CONFIG, "1073741824"), - new AbstractMap.SimpleEntry<>(SEGMENT_INDEX_BYTES_CONFIG, "10485760"), - new AbstractMap.SimpleEntry<>(SEGMENT_JITTER_MS_CONFIG, "0"), - new AbstractMap.SimpleEntry<>(SEGMENT_MS_CONFIG, "604800000"), - new AbstractMap.SimpleEntry<>(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"), - new AbstractMap.SimpleEntry<>(MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true") - ); - - private KafkaConstants() { - } -} \ No newline at end of file diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index d07c24a61b..302cf84c33 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -2260,6 +2260,8 @@ components: type: array items: $ref: "#/components/schemas/ConfigSynonym" + doc: + type: string required: - name