|
@@ -82,22 +82,34 @@ public class AuditService implements Closeable {
|
|
return Optional.empty();
|
|
return Optional.empty();
|
|
}
|
|
}
|
|
if (!topicAudit) {
|
|
if (!topicAudit) {
|
|
- // console logging only
|
|
|
|
- log.info("Audit service initialized for cluster (console only) '{}'", cluster.getName());
|
|
|
|
- return Optional.of(new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER));
|
|
|
|
|
|
+ log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
|
|
|
|
+ return Optional.of(consoleOnlyWriter(cluster));
|
|
}
|
|
}
|
|
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
|
|
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
|
|
- if (createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps)) {
|
|
|
|
- log.info("Audit service initialized for cluster '{}'", cluster.getName());
|
|
|
|
- return Optional.of(
|
|
|
|
- new AuditWriter(cluster.getName(),
|
|
|
|
- auditTopicName,
|
|
|
|
- producerFactory.get(),
|
|
|
|
- consoleAudit ? AUDIT_LOGGER : null
|
|
|
|
- )
|
|
|
|
- );
|
|
|
|
|
|
+ boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
|
|
|
|
+ if (!topicAuditCanBeDone) {
|
|
|
|
+ if (consoleAudit) {
|
|
|
|
+ log.info(
|
|
|
|
+ "Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
|
|
|
|
+ cluster.getName()
|
|
|
|
+ );
|
|
|
|
+ return Optional.of(consoleOnlyWriter(cluster));
|
|
|
|
+ }
|
|
|
|
+ return Optional.empty();
|
|
}
|
|
}
|
|
- return Optional.empty();
|
|
|
|
|
|
+ log.info("Audit initialization finished for cluster '{}'", cluster.getName());
|
|
|
|
+ return Optional.of(
|
|
|
|
+ new AuditWriter(
|
|
|
|
+ cluster.getName(),
|
|
|
|
+ auditTopicName,
|
|
|
|
+ producerFactory.get(),
|
|
|
|
+ consoleAudit ? AUDIT_LOGGER : null
|
|
|
|
+ )
|
|
|
|
+ );
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
|
|
|
|
+ return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -146,7 +158,7 @@ public class AuditService implements Closeable {
|
|
private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
|
|
private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
|
|
log.error("-----------------------------------------------------------------");
|
|
log.error("-----------------------------------------------------------------");
|
|
log.error(
|
|
log.error(
|
|
- "Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
|
|
|
|
|
|
+ "Error initializing Audit for cluster '{}'. Audit will be disabled. See error below: ",
|
|
cluster.getName()
|
|
cluster.getName()
|
|
);
|
|
);
|
|
log.error("{}", errorMsg, cause);
|
|
log.error("{}", errorMsg, cause);
|