|
@@ -35,24 +35,25 @@ public class AuditService implements Closeable {
|
|
|
private final Map<String, AuditWriter> auditWriters = new ConcurrentHashMap<>();
|
|
|
|
|
|
public AuditService(ClustersProperties clustersProperties,
|
|
|
- MessagesService messagesService,
|
|
|
AdminClientService adminClientService,
|
|
|
ClustersStorage clustersStorage) {
|
|
|
if (clustersProperties.getClusters() != null) {
|
|
|
for (var clusterProps : clustersProperties.getClusters()) {
|
|
|
var cluster = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
|
|
|
- initialize(
|
|
|
- cluster,
|
|
|
- adminClientService.get(cluster).block(),
|
|
|
- messagesService
|
|
|
- );
|
|
|
+ ReactiveAdminClient adminClient = null;
|
|
|
+ try {
|
|
|
+ adminClient = adminClientService.get(cluster).block();
|
|
|
+ } catch (Exception e) {
|
|
|
+ printAuditInitError(cluster, "Error connect to cluster", e);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ initialize(cluster, adminClient);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void initialize(KafkaCluster cluster,
|
|
|
- ReactiveAdminClient ac,
|
|
|
- MessagesService messagesService) {
|
|
|
+ ReactiveAdminClient ac) {
|
|
|
var auditProps = cluster.getOriginalProperties().getAudit();
|
|
|
if (auditProps == null) {
|
|
|
return;
|
|
@@ -63,11 +64,12 @@ public class AuditService implements Closeable {
|
|
|
return;
|
|
|
}
|
|
|
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
|
|
|
- KafkaProducer<byte[], byte[]> producer = null;
|
|
|
+ @Nullable KafkaProducer<byte[], byte[]> producer = null;
|
|
|
if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
|
|
|
- producer = messagesService.createProducer(cluster, Map.of());
|
|
|
+ producer = MessagesService.createProducer(cluster, Map.of());
|
|
|
}
|
|
|
auditWriters.put(cluster.getName(), new AuditWriter(auditTopicName, producer, consoleAudit));
|
|
|
+ log.info("Audit service initialized for cluster '{}'", cluster.getName());
|
|
|
}
|
|
|
|
|
|
/**
|