|
@@ -13,6 +13,7 @@ import com.provectus.kafka.ui.service.ClustersStorage;
|
|
|
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
+import java.time.Duration;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
@@ -37,6 +38,7 @@ import reactor.core.publisher.Signal;
|
|
|
public class AuditService implements Closeable {
|
|
|
|
|
|
private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
|
|
|
+ private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
|
|
|
|
|
|
private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
|
|
|
private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
|
|
@@ -56,7 +58,7 @@ public class AuditService implements Closeable {
|
|
|
public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
|
|
|
Map<String, AuditWriter> auditWriters = new HashMap<>();
|
|
|
for (var cluster : clustersStorage.getKafkaClusters()) {
|
|
|
- Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block();
|
|
|
+ Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block(BLOCK_TIMEOUT);
|
|
|
createAuditWriter(cluster, adminClientSupplier, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
|
|
|
.ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
|
|
|
}
|
|
@@ -128,7 +130,7 @@ public class AuditService implements Closeable {
|
|
|
}
|
|
|
boolean topicExists;
|
|
|
try {
|
|
|
- topicExists = ac.listTopics(true).block().contains(auditTopicName);
|
|
|
+ topicExists = ac.listTopics(true).block(BLOCK_TIMEOUT).contains(auditTopicName);
|
|
|
} catch (Exception e) {
|
|
|
printAuditInitError(cluster, "Error checking audit topic existence", e);
|
|
|
return false;
|
|
@@ -146,7 +148,7 @@ public class AuditService implements Closeable {
|
|
|
.ifPresent(topicConfig::putAll);
|
|
|
|
|
|
log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
|
|
|
- ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
|
|
|
+ ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block(BLOCK_TIMEOUT);
|
|
|
log.info("Audit topic created for cluster '{}'", cluster.getName());
|
|
|
return true;
|
|
|
} catch (Exception e) {
|