iliax 2 years ago
parent
commit
19ad8ba42c

+ 27 - 26
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -14,11 +14,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.springframework.stereotype.Service;
 
 @Slf4j
@@ -27,13 +27,15 @@ public class AuditService implements Closeable {
 
   private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
   private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
-  private static final Map<String, String> DEFAULT_AUDIT_TOPIC_PROPERTIES = Map.of(
+  private static final Map<String, String> DEFAULT_AUDIT_TOPIC_CONFIG = Map.of(
       "retention.ms", String.valueOf(TimeUnit.DAYS.toMillis(7)),
-      "cleanup.policy", "delete",
-      "compression.type", "gzip"
+      "cleanup.policy", "delete"
+  );
+  private static final Map<String, Object> AUDIT_PRODUCER_CONFIG = Map.of(
+      ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"
   );
 
-  private final Map<String, AuditWriter> auditWriters = new ConcurrentHashMap<>();
+  private final Map<String, AuditWriter> auditWriters = new HashMap<>();
 
   public AuditService(ClustersProperties clustersProperties,
                       AdminClientService adminClientService,
@@ -51,8 +53,7 @@ public class AuditService implements Closeable {
     }
   }
 
-  private void initialize(KafkaCluster cluster,
-                          ReactiveAdminClient ac) {
+  private void initialize(KafkaCluster cluster, ReactiveAdminClient ac) {
     var auditProps = cluster.getOriginalProperties().getAudit();
     if (auditProps == null) {
       return;
@@ -65,7 +66,7 @@ public class AuditService implements Closeable {
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
     @Nullable KafkaProducer<byte[], byte[]> producer = null;
     if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
-      producer = MessagesService.createProducer(cluster, Map.of());
+      producer = MessagesService.createProducer(cluster, AUDIT_PRODUCER_CONFIG);
     }
     auditWriters.put(cluster.getName(), new AuditWriter(auditTopicName, producer, consoleAudit));
     log.info("Audit service initialized for cluster '{}'", cluster.getName());
@@ -78,12 +79,6 @@ public class AuditService implements Closeable {
                                       ReactiveAdminClient ac,
                                       String auditTopicName,
                                       ClustersProperties.AuditProperties auditProps) {
-    int topicPartitions =
-        Optional.ofNullable(auditProps.getAuditTopicsPartitions()).orElse(DEFAULT_AUDIT_TOPIC_PARTITIONS);
-    Map<String, String> topicConfig = new HashMap<>(DEFAULT_AUDIT_TOPIC_PROPERTIES);
-    Optional.ofNullable(auditProps.getAuditTopicProperties())
-        .ifPresent(topicConfig::putAll);
-
     boolean topicExists;
     try {
       topicExists = ac.listTopics(false).block().contains(auditTopicName);
@@ -91,26 +86,32 @@ public class AuditService implements Closeable {
       printAuditInitError(cluster, "Error checking audit topic existence", e);
       return false;
     }
+    if (topicExists) {
+      return true;
+    }
+    try {
+      int topicPartitions =
+          Optional.ofNullable(auditProps.getAuditTopicsPartitions())
+              .orElse(DEFAULT_AUDIT_TOPIC_PARTITIONS);
 
-    if (!topicExists) {
-      try {
-        log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
-        ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
-        log.info("Audit topic created for cluster '{}'", cluster.getName());
-        return true;
-      } catch (Exception e) {
-        printAuditInitError(cluster, "Error creating topic '%s'".formatted(auditTopicName), e);
-        return false;
-      }
-    } else {
+      Map<String, String> topicConfig = new HashMap<>(DEFAULT_AUDIT_TOPIC_CONFIG);
+      Optional.ofNullable(auditProps.getAuditTopicProperties())
+          .ifPresent(topicConfig::putAll);
+
+      log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
+      ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
+      log.info("Audit topic created for cluster '{}'", cluster.getName());
       return true;
+    } catch (Exception e) {
+      printAuditInitError(cluster, "Error creating topic '%s'".formatted(auditTopicName), e);
+      return false;
     }
   }
 
   private void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
     log.error("-----------------------------------------------------------------");
     log.error(
-        "Error initializing AUDIT Service for cluster '{}'. Audit will be disabled. See error below: ",
+        "Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
         cluster.getName()
     );
     log.error("{}", errorMsg, cause);

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -29,7 +29,7 @@ record AuditWriter(String targetTopic,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    boolean logToConsole) implements Closeable {
 
-  //TODO: discuss AUDIT LOG FORMAT
+  //TODO: discuss AUDIT LOG FORMAT and name
   private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
 
   public void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {