|
@@ -13,6 +13,7 @@ import com.provectus.kafka.ui.service.ClustersStorage;
|
|
|
import com.provectus.kafka.ui.service.ReactiveAdminClient;
|
|
|
import java.io.Closeable;
|
|
|
import java.io.IOException;
|
|
|
+import java.time.Duration;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
@@ -37,6 +38,7 @@ import reactor.core.publisher.Signal;
|
|
|
public class AuditService implements Closeable {
|
|
|
|
|
|
private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
|
|
|
+ private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
|
|
|
|
|
|
private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
|
|
|
private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
|
|
@@ -56,14 +58,8 @@ public class AuditService implements Closeable {
|
|
|
public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
|
|
|
Map<String, AuditWriter> auditWriters = new HashMap<>();
|
|
|
for (var cluster : clustersStorage.getKafkaClusters()) {
|
|
|
- ReactiveAdminClient adminClient;
|
|
|
- try {
|
|
|
- adminClient = adminClientService.get(cluster).block();
|
|
|
- } catch (Exception e) {
|
|
|
- printAuditInitError(cluster, "Error connect to cluster", e);
|
|
|
- continue;
|
|
|
- }
|
|
|
- createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
|
|
|
+ Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block(BLOCK_TIMEOUT);
|
|
|
+ createAuditWriter(cluster, adminClientSupplier, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
|
|
|
.ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
|
|
|
}
|
|
|
this.auditWriters = auditWriters;
|
|
@@ -76,7 +72,7 @@ public class AuditService implements Closeable {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
|
|
|
- ReactiveAdminClient ac,
|
|
|
+ Supplier<ReactiveAdminClient> acSupplier,
|
|
|
Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
|
|
|
var auditProps = cluster.getOriginalProperties().getAudit();
|
|
|
if (auditProps == null) {
|
|
@@ -87,32 +83,54 @@ public class AuditService implements Closeable {
|
|
|
if (!topicAudit && !consoleAudit) {
|
|
|
return Optional.empty();
|
|
|
}
|
|
|
+ if (!topicAudit) {
|
|
|
+ log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
|
|
|
+ return Optional.of(consoleOnlyWriter(cluster));
|
|
|
+ }
|
|
|
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
|
|
|
- @Nullable KafkaProducer<byte[], byte[]> producer = null;
|
|
|
- if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
|
|
|
- producer = producerFactory.get();
|
|
|
+ boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
|
|
|
+ if (!topicAuditCanBeDone) {
|
|
|
+ if (consoleAudit) {
|
|
|
+ log.info(
|
|
|
+ "Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
|
|
|
+ cluster.getName()
|
|
|
+ );
|
|
|
+ return Optional.of(consoleOnlyWriter(cluster));
|
|
|
+ }
|
|
|
+ return Optional.empty();
|
|
|
}
|
|
|
- log.info("Audit service initialized for cluster '{}'", cluster.getName());
|
|
|
+ log.info("Audit initialization finished for cluster '{}'", cluster.getName());
|
|
|
return Optional.of(
|
|
|
new AuditWriter(
|
|
|
cluster.getName(),
|
|
|
auditTopicName,
|
|
|
- producer,
|
|
|
+ producerFactory.get(),
|
|
|
consoleAudit ? AUDIT_LOGGER : null
|
|
|
)
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
|
|
|
+ return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* return true if topic created/existing and producing can be enabled.
|
|
|
*/
|
|
|
private static boolean createTopicIfNeeded(KafkaCluster cluster,
|
|
|
- ReactiveAdminClient ac,
|
|
|
+ Supplier<ReactiveAdminClient> acSupplier,
|
|
|
String auditTopicName,
|
|
|
ClustersProperties.AuditProperties auditProps) {
|
|
|
+ ReactiveAdminClient ac;
|
|
|
+ try {
|
|
|
+ ac = acSupplier.get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ printAuditInitError(cluster, "Error while connecting to the cluster", e);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
boolean topicExists;
|
|
|
try {
|
|
|
- topicExists = ac.listTopics(true).block().contains(auditTopicName);
|
|
|
+ topicExists = ac.listTopics(true).block(BLOCK_TIMEOUT).contains(auditTopicName);
|
|
|
} catch (Exception e) {
|
|
|
printAuditInitError(cluster, "Error checking audit topic existence", e);
|
|
|
return false;
|
|
@@ -130,7 +148,7 @@ public class AuditService implements Closeable {
|
|
|
.ifPresent(topicConfig::putAll);
|
|
|
|
|
|
log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
|
|
|
- ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
|
|
|
+ ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block(BLOCK_TIMEOUT);
|
|
|
log.info("Audit topic created for cluster '{}'", cluster.getName());
|
|
|
return true;
|
|
|
} catch (Exception e) {
|
|
@@ -142,7 +160,7 @@ public class AuditService implements Closeable {
|
|
|
private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
|
|
|
log.error("-----------------------------------------------------------------");
|
|
|
log.error(
|
|
|
- "Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
|
|
|
+ "Error initializing Audit for cluster '{}'. Audit will be disabled. See error below: ",
|
|
|
cluster.getName()
|
|
|
);
|
|
|
log.error("{}", errorMsg, cause);
|