ISSUE-2787:

1. Getting topic config defaults from server response
2. rm KafkaConstants
3. doc field added to TopicConfigDTO
This commit is contained in:
iliax 2022-10-20 19:37:42 +04:00
parent dc1785e338
commit 4f3ae69a77
6 changed files with 24 additions and 78 deletions

View file

@ -1,8 +1,5 @@
package com.provectus.kafka.ui.model;
import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
import java.util.List;
import lombok.Builder;
import lombok.Data;
@ -19,6 +16,7 @@ public class InternalTopicConfig {
private final boolean isSensitive;
private final boolean isReadOnly;
private final List<ConfigEntry.ConfigSynonym> synonyms;
private final String doc;
public static InternalTopicConfig from(ConfigEntry configEntry) {
InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
@ -27,11 +25,22 @@ public class InternalTopicConfig {
.source(configEntry.source())
.isReadOnly(configEntry.isReadOnly())
.isSensitive(configEntry.isSensitive())
.synonyms(configEntry.synonyms());
if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
.synonyms(configEntry.synonyms())
.doc(configEntry.documentation());
if (configEntry.source() == ConfigEntry.ConfigSource.DEFAULT_CONFIG) {
// this is important case, because for some configs like "confluent.*" no synonyms returned, but
// they are set by default and "source" == DEFAULT_CONFIG
builder.defaultValue(configEntry.value());
} else {
builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
// normally by default first entity of synonyms values will be used.
configEntry.synonyms().stream()
// skipping DYNAMIC_TOPIC_CONFIG value - which is explicitly set value when
// topic was created (not default), see ConfigEntry.synonyms() doc
.filter(s -> s.source() != ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
.map(ConfigEntry.ConfigSynonym::value)
.findFirst()
.ifPresent(builder::defaultValue);
}
return builder.build();
}

View file

@ -145,20 +145,20 @@ public class ReactiveAdminClient implements Closeable {
}
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
return listTopics(true).flatMap(this::getTopicsConfig);
return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
}
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topicNames,
200,
this::getTopicsConfigImpl,
part -> getTopicsConfigImpl(part, includeDoc),
(m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
);
}
private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames) {
private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
List<ConfigResource> resources = topicNames.stream()
.map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
.collect(toList());
@ -166,7 +166,7 @@ public class ReactiveAdminClient implements Closeable {
return toMonoWithExceptionFilter(
client.describeConfigs(
resources,
new DescribeConfigsOptions().includeSynonyms(true)).values(),
new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
UnknownTopicOrPartitionException.class
).map(config -> config.entrySet().stream()
.collect(toMap(

View file

@ -68,7 +68,7 @@ public class TopicsService {
}
return adminClientService.get(c)
.flatMap(ac ->
ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics),
ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false),
(descriptions, configs) -> {
statisticsCache.update(c, descriptions, configs);
return getPartitionOffsets(descriptions, ac).map(offsets -> {
@ -160,7 +160,7 @@ public class TopicsService {
public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topicName) {
return adminClientService.get(cluster)
.flatMap(ac -> ac.getTopicsConfig(List.of(topicName)))
.flatMap(ac -> ac.getTopicsConfig(List.of(topicName), true))
.map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new));
}

View file

@ -1,65 +0,0 @@
package com.provectus.kafka.ui.util;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE;
import static org.apache.kafka.common.config.TopicConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.FILE_DELETE_DELAY_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.FLUSH_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.INDEX_INTERVAL_BYTES_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MAX_MESSAGE_BYTES_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.PREALLOCATE_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_INDEX_BYTES_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_JITTER_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
import java.util.AbstractMap;
import java.util.Map;
public final class KafkaConstants {
private static final String LONG_MAX_STRING = Long.toString(Long.MAX_VALUE);
public static final Map<String, String> TOPIC_DEFAULT_CONFIGS = Map.ofEntries(
new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE),
new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"),
new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"),
new AbstractMap.SimpleEntry<>(FILE_DELETE_DELAY_MS_CONFIG, "60000"),
new AbstractMap.SimpleEntry<>(FLUSH_MESSAGES_INTERVAL_CONFIG, LONG_MAX_STRING),
new AbstractMap.SimpleEntry<>(FLUSH_MS_CONFIG, LONG_MAX_STRING),
new AbstractMap.SimpleEntry<>("follower.replication.throttled.replicas", ""),
new AbstractMap.SimpleEntry<>(INDEX_INTERVAL_BYTES_CONFIG, "4096"),
new AbstractMap.SimpleEntry<>("leader.replication.throttled.replicas", ""),
new AbstractMap.SimpleEntry<>(MAX_COMPACTION_LAG_MS_CONFIG, LONG_MAX_STRING),
new AbstractMap.SimpleEntry<>(MAX_MESSAGE_BYTES_CONFIG, "1000012"),
new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG_MAX_STRING),
new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
new AbstractMap.SimpleEntry<>(MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"),
new AbstractMap.SimpleEntry<>(MIN_COMPACTION_LAG_MS_CONFIG, "0"),
new AbstractMap.SimpleEntry<>(MIN_IN_SYNC_REPLICAS_CONFIG, "1"),
new AbstractMap.SimpleEntry<>(PREALLOCATE_CONFIG, "false"),
new AbstractMap.SimpleEntry<>(RETENTION_BYTES_CONFIG, "-1"),
new AbstractMap.SimpleEntry<>(RETENTION_MS_CONFIG, "604800000"),
new AbstractMap.SimpleEntry<>(SEGMENT_BYTES_CONFIG, "1073741824"),
new AbstractMap.SimpleEntry<>(SEGMENT_INDEX_BYTES_CONFIG, "10485760"),
new AbstractMap.SimpleEntry<>(SEGMENT_JITTER_MS_CONFIG, "0"),
new AbstractMap.SimpleEntry<>(SEGMENT_MS_CONFIG, "604800000"),
new AbstractMap.SimpleEntry<>(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"),
new AbstractMap.SimpleEntry<>(MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true")
);
private KafkaConstants() {
}
}

View file

@ -2260,6 +2260,8 @@ components:
type: array
items:
$ref: "#/components/schemas/ConfigSynonym"
doc:
type: string
required:
- name