Merge branch 'master' into audit-be-minor-refactoring
This commit is contained in:
commit
2d2da5a10e
1 changed files with 37 additions and 30 deletions
|
@ -15,6 +15,8 @@ import com.provectus.kafka.ui.exception.ValidationException;
|
|||
import com.provectus.kafka.ui.util.KafkaVersion;
|
||||
import com.provectus.kafka.ui.util.annotation.KafkaClientInternalsDependant;
|
||||
import java.io.Closeable;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -129,38 +131,41 @@ public class ReactiveAdminClient implements Closeable {
|
|||
Set<SupportedFeature> features,
|
||||
boolean topicDeletionIsAllowed) {
|
||||
|
||||
private static Mono<ConfigRelatedInfo> extract(AdminClient ac, int controllerId) {
|
||||
return loadBrokersConfig(ac, List.of(controllerId))
|
||||
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(controllerId))
|
||||
.flatMap(configs -> {
|
||||
String version = "1.0-UNKNOWN";
|
||||
boolean topicDeletionEnabled = true;
|
||||
for (ConfigEntry entry : configs) {
|
||||
if (entry.name().contains("inter.broker.protocol.version")) {
|
||||
version = entry.value();
|
||||
}
|
||||
if (entry.name().equals("delete.topic.enable")) {
|
||||
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
|
||||
}
|
||||
}
|
||||
var builder = ConfigRelatedInfo.builder()
|
||||
.version(version)
|
||||
.topicDeletionIsAllowed(topicDeletionEnabled);
|
||||
return SupportedFeature.forVersion(ac, version)
|
||||
.map(features -> builder.features(features).build());
|
||||
});
|
||||
static final Duration UPDATE_DURATION = Duration.of(1, ChronoUnit.HOURS);
|
||||
|
||||
private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
|
||||
return ReactiveAdminClient.describeClusterImpl(ac, Set.of())
|
||||
.flatMap(desc -> {
|
||||
// choosing node from which we will get configs (starting with controller)
|
||||
var targetNodeId = Optional.ofNullable(desc.controller)
|
||||
.map(Node::id)
|
||||
.orElse(desc.getNodes().iterator().next().id());
|
||||
return loadBrokersConfig(ac, List.of(targetNodeId))
|
||||
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
|
||||
.flatMap(configs -> {
|
||||
String version = "1.0-UNKNOWN";
|
||||
boolean topicDeletionEnabled = true;
|
||||
for (ConfigEntry entry : configs) {
|
||||
if (entry.name().contains("inter.broker.protocol.version")) {
|
||||
version = entry.value();
|
||||
}
|
||||
if (entry.name().equals("delete.topic.enable")) {
|
||||
topicDeletionEnabled = Boolean.parseBoolean(entry.value());
|
||||
}
|
||||
}
|
||||
final String finalVersion = version;
|
||||
final boolean finalTopicDeletionEnabled = topicDeletionEnabled;
|
||||
return SupportedFeature.forVersion(ac, version)
|
||||
.map(features -> new ConfigRelatedInfo(finalVersion, features, finalTopicDeletionEnabled));
|
||||
});
|
||||
})
|
||||
.cache(UPDATE_DURATION);
|
||||
}
|
||||
}
|
||||
|
||||
public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
|
||||
return describeClusterImpl(adminClient, Set.of())
|
||||
// choosing node from which we will get configs (starting with controller)
|
||||
.flatMap(descr -> descr.controller != null
|
||||
? Mono.just(descr.controller)
|
||||
: Mono.justOrEmpty(descr.nodes.stream().findFirst())
|
||||
)
|
||||
.flatMap(node -> ConfigRelatedInfo.extract(adminClient, node.id()))
|
||||
.map(info -> new ReactiveAdminClient(adminClient, info));
|
||||
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
|
||||
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
|
||||
}
|
||||
|
||||
|
||||
|
@ -170,7 +175,7 @@ public class ReactiveAdminClient implements Closeable {
|
|||
.doOnError(th -> !(th instanceof SecurityDisabledException)
|
||||
&& !(th instanceof InvalidRequestException)
|
||||
&& !(th instanceof UnsupportedVersionException),
|
||||
th -> log.warn("Error checking if security enabled", th))
|
||||
th -> log.debug("Error checking if security enabled", th))
|
||||
.onErrorReturn(false);
|
||||
}
|
||||
|
||||
|
@ -202,6 +207,8 @@ public class ReactiveAdminClient implements Closeable {
|
|||
|
||||
@Getter(AccessLevel.PACKAGE) // visible for testing
|
||||
private final AdminClient client;
|
||||
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
|
||||
|
||||
private volatile ConfigRelatedInfo configRelatedInfo;
|
||||
|
||||
public Set<SupportedFeature> getClusterFeatures() {
|
||||
|
@ -228,7 +235,7 @@ public class ReactiveAdminClient implements Closeable {
|
|||
if (controller == null) {
|
||||
return Mono.empty();
|
||||
}
|
||||
return ConfigRelatedInfo.extract(client, controller.id())
|
||||
return configRelatedInfoMono
|
||||
.doOnNext(info -> this.configRelatedInfo = info)
|
||||
.then();
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue