|
@@ -56,14 +56,8 @@ public class AuditService implements Closeable {
|
|
|
public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
|
|
|
Map<String, AuditWriter> auditWriters = new HashMap<>();
|
|
|
for (var cluster : clustersStorage.getKafkaClusters()) {
|
|
|
- ReactiveAdminClient adminClient;
|
|
|
- try {
|
|
|
- adminClient = adminClientService.get(cluster).block();
|
|
|
- } catch (Exception e) {
|
|
|
- printAuditInitError(cluster, "Error connect to cluster", e);
|
|
|
- continue;
|
|
|
- }
|
|
|
- createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
|
|
|
+ Supplier<ReactiveAdminClient> adminClientSupplier = () -> adminClientService.get(cluster).block();
|
|
|
+ createAuditWriter(cluster, adminClientSupplier, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
|
|
|
.ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
|
|
|
}
|
|
|
this.auditWriters = auditWriters;
|
|
@@ -76,7 +70,7 @@ public class AuditService implements Closeable {
|
|
|
|
|
|
@VisibleForTesting
|
|
|
static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
|
|
|
- ReactiveAdminClient ac,
|
|
|
+ Supplier<ReactiveAdminClient> acSupplier,
|
|
|
Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
|
|
|
var auditProps = cluster.getOriginalProperties().getAudit();
|
|
|
if (auditProps == null) {
|
|
@@ -87,29 +81,39 @@ public class AuditService implements Closeable {
|
|
|
if (!topicAudit && !consoleAudit) {
|
|
|
return Optional.empty();
|
|
|
}
|
|
|
+ if (!topicAudit) {
|
|
|
+ // console logging only
|
|
|
+ log.info("Audit service initialized for cluster (console only) '{}'", cluster.getName());
|
|
|
+ return Optional.of(new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER));
|
|
|
+ }
|
|
|
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
|
|
|
- @Nullable KafkaProducer<byte[], byte[]> producer = null;
|
|
|
- if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
|
|
|
- producer = producerFactory.get();
|
|
|
+ if (createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps)) {
|
|
|
+ log.info("Audit service initialized for cluster '{}'", cluster.getName());
|
|
|
+ return Optional.of(
|
|
|
+ new AuditWriter(cluster.getName(),
|
|
|
+ auditTopicName,
|
|
|
+ producerFactory.get(),
|
|
|
+ consoleAudit ? AUDIT_LOGGER : null
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
- log.info("Audit service initialized for cluster '{}'", cluster.getName());
|
|
|
- return Optional.of(
|
|
|
- new AuditWriter(
|
|
|
- cluster.getName(),
|
|
|
- auditTopicName,
|
|
|
- producer,
|
|
|
- consoleAudit ? AUDIT_LOGGER : null
|
|
|
- )
|
|
|
- );
|
|
|
+ return Optional.empty();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* return true if topic created/existing and producing can be enabled.
|
|
|
*/
|
|
|
private static boolean createTopicIfNeeded(KafkaCluster cluster,
|
|
|
- ReactiveAdminClient ac,
|
|
|
+ Supplier<ReactiveAdminClient> acSupplier,
|
|
|
String auditTopicName,
|
|
|
ClustersProperties.AuditProperties auditProps) {
|
|
|
+ ReactiveAdminClient ac;
|
|
|
+ try {
|
|
|
+ ac = acSupplier.get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ printAuditInitError(cluster, "Error connect to cluster", e);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
boolean topicExists;
|
|
|
try {
|
|
|
topicExists = ac.listTopics(true).block().contains(auditTopicName);
|