ISSUE-2787: Setting default config values properly (#2788)
ISSUE-2787: 1. Getting topic config defaults from server response 2. rm KafkaConstants 3. doc field added to TopicConfigDTO 4. ReactiveAdminClient.SupportedFeature refactor
This commit is contained in:
parent
81072541a8
commit
9a3cbfa14b
7 changed files with 51 additions and 90 deletions
|
@ -1,8 +1,5 @@
|
|||
package com.provectus.kafka.ui.model;
|
||||
|
||||
import static com.provectus.kafka.ui.util.KafkaConstants.TOPIC_DEFAULT_CONFIGS;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG;
|
||||
|
||||
import java.util.List;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
@ -19,6 +16,7 @@ public class InternalTopicConfig {
|
|||
private final boolean isSensitive;
|
||||
private final boolean isReadOnly;
|
||||
private final List<ConfigEntry.ConfigSynonym> synonyms;
|
||||
private final String doc;
|
||||
|
||||
public static InternalTopicConfig from(ConfigEntry configEntry) {
|
||||
InternalTopicConfig.InternalTopicConfigBuilder builder = InternalTopicConfig.builder()
|
||||
|
@ -27,11 +25,22 @@ public class InternalTopicConfig {
|
|||
.source(configEntry.source())
|
||||
.isReadOnly(configEntry.isReadOnly())
|
||||
.isSensitive(configEntry.isSensitive())
|
||||
.synonyms(configEntry.synonyms());
|
||||
if (configEntry.name().equals(MESSAGE_FORMAT_VERSION_CONFIG)) {
|
||||
.synonyms(configEntry.synonyms())
|
||||
.doc(configEntry.documentation());
|
||||
|
||||
if (configEntry.source() == ConfigEntry.ConfigSource.DEFAULT_CONFIG) {
|
||||
// this is important case, because for some configs like "confluent.*" no synonyms returned, but
|
||||
// they are set by default and "source" == DEFAULT_CONFIG
|
||||
builder.defaultValue(configEntry.value());
|
||||
} else {
|
||||
builder.defaultValue(TOPIC_DEFAULT_CONFIGS.get(configEntry.name()));
|
||||
// normally by default first entity of synonyms values will be used.
|
||||
configEntry.synonyms().stream()
|
||||
// skipping DYNAMIC_TOPIC_CONFIG value - which is explicitly set value when
|
||||
// topic was created (not default), see ConfigEntry.synonyms() doc
|
||||
.filter(s -> s.source() != ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG)
|
||||
.map(ConfigEntry.ConfigSynonym::value)
|
||||
.findFirst()
|
||||
.ifPresent(builder::defaultValue);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import com.provectus.kafka.ui.util.MapUtil;
|
|||
import com.provectus.kafka.ui.util.NumberUtil;
|
||||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -69,8 +70,24 @@ import reactor.util.function.Tuples;
|
|||
public class ReactiveAdminClient implements Closeable {
|
||||
|
||||
private enum SupportedFeature {
|
||||
INCREMENTAL_ALTER_CONFIGS,
|
||||
ALTER_CONFIGS
|
||||
INCREMENTAL_ALTER_CONFIGS(2.3f),
|
||||
CONFIG_DOCUMENTATION_RETRIEVAL(2.6f);
|
||||
|
||||
private final float sinceVersion;
|
||||
|
||||
SupportedFeature(float sinceVersion) {
|
||||
this.sinceVersion = sinceVersion;
|
||||
}
|
||||
|
||||
static Set<SupportedFeature> forVersion(float kafkaVersion) {
|
||||
return Arrays.stream(SupportedFeature.values())
|
||||
.filter(f -> kafkaVersion >= f.sinceVersion)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
static Set<SupportedFeature> defaultFeatures() {
|
||||
return Set.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Value
|
||||
|
@ -88,18 +105,15 @@ public class ReactiveAdminClient implements Closeable {
|
|||
new ReactiveAdminClient(
|
||||
adminClient,
|
||||
ver,
|
||||
Set.of(getSupportedUpdateFeatureForVersion(ver))));
|
||||
getSupportedUpdateFeaturesForVersion(ver)));
|
||||
}
|
||||
|
||||
private static SupportedFeature getSupportedUpdateFeatureForVersion(String versionStr) {
|
||||
private static Set<SupportedFeature> getSupportedUpdateFeaturesForVersion(String versionStr) {
|
||||
try {
|
||||
float version = NumberUtil.parserClusterVersion(versionStr);
|
||||
return version <= 2.3f
|
||||
? SupportedFeature.ALTER_CONFIGS
|
||||
: SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
|
||||
return SupportedFeature.forVersion(version);
|
||||
} catch (NumberFormatException e) {
|
||||
log.info("Assuming non-incremental alter configs due to version parsing error");
|
||||
return SupportedFeature.ALTER_CONFIGS;
|
||||
return SupportedFeature.defaultFeatures();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,20 +159,21 @@ public class ReactiveAdminClient implements Closeable {
|
|||
}
|
||||
|
||||
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig() {
|
||||
return listTopics(true).flatMap(this::getTopicsConfig);
|
||||
return listTopics(true).flatMap(topics -> getTopicsConfig(topics, false));
|
||||
}
|
||||
|
||||
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames) {
|
||||
public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> topicNames, boolean includeDoc) {
|
||||
var includeDocFixed = features.contains(SupportedFeature.CONFIG_DOCUMENTATION_RETRIEVAL) && includeDoc;
|
||||
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
|
||||
return partitionCalls(
|
||||
topicNames,
|
||||
200,
|
||||
this::getTopicsConfigImpl,
|
||||
part -> getTopicsConfigImpl(part, includeDocFixed),
|
||||
(m1, m2) -> ImmutableMap.<String, List<ConfigEntry>>builder().putAll(m1).putAll(m2).build()
|
||||
);
|
||||
}
|
||||
|
||||
private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames) {
|
||||
private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
|
||||
List<ConfigResource> resources = topicNames.stream()
|
||||
.map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
|
||||
.collect(toList());
|
||||
|
@ -166,7 +181,7 @@ public class ReactiveAdminClient implements Closeable {
|
|||
return toMonoWithExceptionFilter(
|
||||
client.describeConfigs(
|
||||
resources,
|
||||
new DescribeConfigsOptions().includeSynonyms(true)).values(),
|
||||
new DescribeConfigsOptions().includeSynonyms(true).includeDocumentation(includeDoc)).values(),
|
||||
UnknownTopicOrPartitionException.class
|
||||
).map(config -> config.entrySet().stream()
|
||||
.collect(toMap(
|
||||
|
|
|
@ -68,7 +68,7 @@ public class TopicsService {
|
|||
}
|
||||
return adminClientService.get(c)
|
||||
.flatMap(ac ->
|
||||
ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics),
|
||||
ac.describeTopics(topics).zipWith(ac.getTopicsConfig(topics, false),
|
||||
(descriptions, configs) -> {
|
||||
statisticsCache.update(c, descriptions, configs);
|
||||
return getPartitionOffsets(descriptions, ac).map(offsets -> {
|
||||
|
@ -160,7 +160,7 @@ public class TopicsService {
|
|||
|
||||
public Mono<List<ConfigEntry>> getTopicConfigs(KafkaCluster cluster, String topicName) {
|
||||
return adminClientService.get(cluster)
|
||||
.flatMap(ac -> ac.getTopicsConfig(List.of(topicName)))
|
||||
.flatMap(ac -> ac.getTopicsConfig(List.of(topicName), true))
|
||||
.map(m -> m.values().stream().findFirst().orElseThrow(TopicNotFoundException::new));
|
||||
}
|
||||
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
package com.provectus.kafka.ui.util;
|
||||
|
||||
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.CLEANUP_POLICY_DELETE;
|
||||
import static org.apache.kafka.common.config.TopicConfig.COMPRESSION_TYPE_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.FILE_DELETE_DELAY_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.FLUSH_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.INDEX_INTERVAL_BYTES_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MAX_MESSAGE_BYTES_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_DOWNCONVERSION_ENABLE_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.PREALLOCATE_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.RETENTION_BYTES_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_BYTES_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_INDEX_BYTES_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_JITTER_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG;
|
||||
import static org.apache.kafka.common.config.TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.Map;
|
||||
|
||||
public final class KafkaConstants {
|
||||
|
||||
private static final String LONG_MAX_STRING = Long.toString(Long.MAX_VALUE);
|
||||
|
||||
public static final Map<String, String> TOPIC_DEFAULT_CONFIGS = Map.ofEntries(
|
||||
new AbstractMap.SimpleEntry<>(CLEANUP_POLICY_CONFIG, CLEANUP_POLICY_DELETE),
|
||||
new AbstractMap.SimpleEntry<>(COMPRESSION_TYPE_CONFIG, "producer"),
|
||||
new AbstractMap.SimpleEntry<>(DELETE_RETENTION_MS_CONFIG, "86400000"),
|
||||
new AbstractMap.SimpleEntry<>(FILE_DELETE_DELAY_MS_CONFIG, "60000"),
|
||||
new AbstractMap.SimpleEntry<>(FLUSH_MESSAGES_INTERVAL_CONFIG, LONG_MAX_STRING),
|
||||
new AbstractMap.SimpleEntry<>(FLUSH_MS_CONFIG, LONG_MAX_STRING),
|
||||
new AbstractMap.SimpleEntry<>("follower.replication.throttled.replicas", ""),
|
||||
new AbstractMap.SimpleEntry<>(INDEX_INTERVAL_BYTES_CONFIG, "4096"),
|
||||
new AbstractMap.SimpleEntry<>("leader.replication.throttled.replicas", ""),
|
||||
new AbstractMap.SimpleEntry<>(MAX_COMPACTION_LAG_MS_CONFIG, LONG_MAX_STRING),
|
||||
new AbstractMap.SimpleEntry<>(MAX_MESSAGE_BYTES_CONFIG, "1000012"),
|
||||
new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG, LONG_MAX_STRING),
|
||||
new AbstractMap.SimpleEntry<>(MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"),
|
||||
new AbstractMap.SimpleEntry<>(MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.5"),
|
||||
new AbstractMap.SimpleEntry<>(MIN_COMPACTION_LAG_MS_CONFIG, "0"),
|
||||
new AbstractMap.SimpleEntry<>(MIN_IN_SYNC_REPLICAS_CONFIG, "1"),
|
||||
new AbstractMap.SimpleEntry<>(PREALLOCATE_CONFIG, "false"),
|
||||
new AbstractMap.SimpleEntry<>(RETENTION_BYTES_CONFIG, "-1"),
|
||||
new AbstractMap.SimpleEntry<>(RETENTION_MS_CONFIG, "604800000"),
|
||||
new AbstractMap.SimpleEntry<>(SEGMENT_BYTES_CONFIG, "1073741824"),
|
||||
new AbstractMap.SimpleEntry<>(SEGMENT_INDEX_BYTES_CONFIG, "10485760"),
|
||||
new AbstractMap.SimpleEntry<>(SEGMENT_JITTER_MS_CONFIG, "0"),
|
||||
new AbstractMap.SimpleEntry<>(SEGMENT_MS_CONFIG, "604800000"),
|
||||
new AbstractMap.SimpleEntry<>(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"),
|
||||
new AbstractMap.SimpleEntry<>(MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "true")
|
||||
);
|
||||
|
||||
private KafkaConstants() {
|
||||
}
|
||||
}
|
|
@ -3,10 +3,10 @@ package com.provectus.kafka.ui;
|
|||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.http.MediaType.TEXT_EVENT_STREAM;
|
||||
|
||||
import com.provectus.kafka.ui.api.model.TopicConfig;
|
||||
import com.provectus.kafka.ui.model.BrokerConfigDTO;
|
||||
import com.provectus.kafka.ui.model.PartitionsIncreaseDTO;
|
||||
import com.provectus.kafka.ui.model.PartitionsIncreaseResponseDTO;
|
||||
import com.provectus.kafka.ui.model.TopicConfigDTO;
|
||||
import com.provectus.kafka.ui.model.TopicCreationDTO;
|
||||
import com.provectus.kafka.ui.model.TopicDetailsDTO;
|
||||
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
|
||||
|
@ -206,12 +206,12 @@ public class KafkaConsumerTests extends AbstractIntegrationTest {
|
|||
.expectStatus()
|
||||
.isOk();
|
||||
|
||||
List<TopicConfig> configs = webTestClient.get()
|
||||
List<TopicConfigDTO> configs = webTestClient.get()
|
||||
.uri("/api/clusters/{clusterName}/topics/{topicName}/config", LOCAL, topicName)
|
||||
.exchange()
|
||||
.expectStatus()
|
||||
.isOk()
|
||||
.expectBodyList(TopicConfig.class)
|
||||
.expectBodyList(TopicConfigDTO.class)
|
||||
.returnResult()
|
||||
.getResponseBody();
|
||||
|
||||
|
|
|
@ -2213,6 +2213,8 @@ components:
|
|||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/ConfigSynonym"
|
||||
doc:
|
||||
type: string
|
||||
required:
|
||||
- name
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue