|
@@ -15,6 +15,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
|
|
|
import com.provectus.kafka.ui.util.KafkaVersion;
|
|
|
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
|
|
import java.io.Closeable;
|
|
|
+import java.time.Duration;
|
|
|
+import java.time.temporal.ChronoUnit;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
@@ -129,38 +131,41 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
Set<SupportedFeature> features,
|
|
|
boolean topicDeletionIsAllowed) {
|
|
|
|
|
|
- private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
|
|
|
- return loadBrokersConfig(ac, List.of(controllerId))
|
|
|
- .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
|
|
|
- .flatMap(configs -> {
|
|
|
- String version = "1.0-UNKNOWN";
|
|
|
- boolean topicDeletionEnabled = true;
|
|
|
- for (ConfigEntry entry : configs) {
|
|
|
- if (entry.name().contains("inter.broker.protocol.version")) {
|
|
|
- version = entry.value();
|
|
|
- }
|
|
|
- if (entry.name().equals("delete.topic.enable")) {
|
|
|
- topicDeletionEnabled = Boolean.parseBoolean(entry.value());
|
|
|
- }
|
|
|
- }
|
|
|
- var builder = ConfigRelatedInfo.builder()
|
|
|
- .version(version)
|
|
|
- .topicDeletionIsAllowed(topicDeletionEnabled);
|
|
|
- return SupportedFeature.forVersion(ac, version)
|
|
|
- .map(features -> builder.features(features).build());
|
|
|
- });
|
|
|
+ static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
|
|
|
+
|
|
|
+ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
|
|
|
+ return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
|
|
|
+ .flatMap(desc -> {
|
|
|
+ // choosing node from which we will get configs (starting with controller)
|
|
|
+ var targetNodeId = Optional.ofNullable(desc.controller)
|
|
|
+ .map(Node::id)
|
|
|
+ .orElse(desc.getNodes().iterator().next().id());
|
|
|
+ return loadBrokersConfig(ac, List.of(targetNodeId))
|
|
|
+ .map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
|
|
|
+ .flatMap(configs -> {
|
|
|
+ String version = "1.0-UNKNOWN";
|
|
|
+ boolean topicDeletionEnabled = true;
|
|
|
+ for (ConfigEntry entry : configs) {
|
|
|
+ if (entry.name().contains("inter.broker.protocol.version")) {
|
|
|
+ version = entry.value();
|
|
|
+ }
|
|
|
+ if (entry.name().equals("delete.topic.enable")) {
|
|
|
+ topicDeletionEnabled = Boolean.parseBoolean(entry.value());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ final String finalVersion = version;
|
|
|
+ final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
|
|
|
+ return SupportedFeature.forVersion(ac, version)
|
|
|
+ .map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
|
|
|
+ });
|
|
|
+ })
|
|
|
+ .cache(UPDATE_DURATION);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
|
|
- return describeClusterImpl(adminClient, Set.of())
|
|
|
- // choosing node from which we will get configs (starting with controller)
|
|
|
- .flatMap(descr -> descr.controller != null
|
|
|
- ? Mono.just(descr.controller)
|
|
|
- : Mono.justOrEmpty(descr.nodes.stream().findFirst())
|
|
|
- )
|
|
|
- .flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
|
|
|
- .map(info -> new ReactiveAdminClient(adminClient, info));
|
|
|
+ Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
|
|
|
+ return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -170,7 +175,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
.doOnError(th -> !(th instanceof SecurityDisabledException)
|
|
|
&& !(th instanceof InvalidRequestException)
|
|
|
&& !(th instanceof UnsupportedVersionException),
|
|
|
- th -> log.warn("Error checking if security enabled", th))
|
|
|
+ th -> log.debug("Error checking if security enabled", th))
|
|
|
.onErrorReturn(false);
|
|
|
}
|
|
|
|
|
@@ -202,6 +207,8 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
|
|
|
@Getter(AccessLevel.PACKAGE) // visible for testing
|
|
|
private final AdminClient client;
|
|
|
+ private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
|
|
|
+
|
|
|
private volatile ConfigRelatedInfo configRelatedInfo;
|
|
|
|
|
|
public Set<SupportedFeature> getClusterFeatures() {
|
|
@@ -228,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
|
|
|
if (controller == null) {
|
|
|
return Mono.empty();
|
|
|
}
|
|
|
- return ConfigRelatedInfo.extract(client, controller.id())
|
|
|
+ return configRelatedInfoMono
|
|
|
.doOnNext(info -> this.configRelatedInfo = info)
|
|
|
.then();
|
|
|
}
|