iliax há 2 anos atrás
pai
commit
c07fd6bc4f

+ 78 - 49
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -1,40 +1,30 @@
 package com.provectus.kafka.ui.service.audit;
 
-import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
-import com.provectus.kafka.ui.exception.CustomBaseException;
-import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
-import com.provectus.kafka.ui.model.rbac.Resource;
-import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.io.Closeable;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.security.access.AccessDeniedException;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.springframework.stereotype.Service;
 
+@Slf4j
 @Service
-@RequiredArgsConstructor
 public class AuditService implements Closeable {
 
-  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
-
-  private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log ";
+  private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
   private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
   private static final Map<String, String> DEFAULT_AUDIT_TOPIC_PROPERTIES = Map.of(
       "retention.ms", String.valueOf(TimeUnit.DAYS.toMillis(7)),
@@ -42,68 +32,107 @@ public class AuditService implements Closeable {
       "compression.type", "gzip"
   );
 
-
   private final Map<String, AuditWriter> auditWriters = new ConcurrentHashMap<>();
 
-  private final ClustersStorage clustersStorage;
-
   public AuditService(ClustersProperties clustersProperties,
                       MessagesService messagesService,
                       AdminClientService adminClientService,
                       ClustersStorage clustersStorage) {
-    this.clustersStorage = clustersStorage;
     if (clustersProperties.getClusters() != null) {
       for (var clusterProps : clustersProperties.getClusters()) {
-        var c = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
-        createTopicAndProducer(
-            c,
-            adminClientService.get(c).block(),
+        var cluster = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
+        initialize(
+            cluster,
+            adminClientService.get(cluster).block(),
             messagesService
         );
       }
     }
   }
 
-  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
-    if (ctx.getCluster() != null) {
-      if (auditWriters.containsKey(ctx.getCluster())) {
-        auditWriters.get(ctx.getCluster()).write(ctx, user);
-      }
-    } else {
-      //TODO: app config change
+  private void initialize(KafkaCluster cluster,
+                          ReactiveAdminClient ac,
+                          MessagesService messagesService) {
+    var auditProps = cluster.getOriginalProperties().getAudit();
+    if (auditProps == null) {
+      return;
+    }
+    boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
+    boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
+    if (!topicAudit && !consoleAudit) {
+      return;
+    }
+    String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
+    KafkaProducer<byte[], byte[]> producer = null;
+    if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
+      producer = messagesService.createProducer(cluster, Map.of());
     }
+    auditWriters.put(cluster.getName(), new AuditWriter(auditTopicName, producer, consoleAudit));
   }
 
-  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, Throwable th) {
-    if (ctx.getCluster() != null) {
-      if (auditWriters.containsKey(ctx.getCluster())) {
-        auditWriters.get(ctx.getCluster()).write(ctx, user, th);
+  /**
+   * @return true if topic created/existing and producing can be enabled
+   */
+  private boolean createTopicIfNeeded(KafkaCluster cluster,
+                                      ReactiveAdminClient ac,
+                                      String auditTopicName,
+                                      ClustersProperties.AuditProperties auditProps) {
+    int topicPartitions =
+        Optional.ofNullable(auditProps.getAuditTopicsPartitions()).orElse(DEFAULT_AUDIT_TOPIC_PARTITIONS);
+    Map<String, String> topicConfig = new HashMap<>(DEFAULT_AUDIT_TOPIC_PROPERTIES);
+    Optional.ofNullable(auditProps.getAuditTopicProperties())
+        .ifPresent(topicConfig::putAll);
+
+    boolean topicExists;
+    try {
+      topicExists = ac.listTopics(false).block().contains(auditTopicName);
+    } catch (Exception e) {
+      printAuditInitError(cluster, "Error checking audit topic existence", e);
+      return false;
+    }
+
+    if (!topicExists) {
+      try {
+        log.info("Creating audit topic '{}' for cluster '{}'", auditTopicName, cluster.getName());
+        ac.createTopic(auditTopicName, topicPartitions, null, topicConfig).block();
+        log.info("Audit topic created for cluster '{}'", cluster.getName());
+        return true;
+      } catch (Exception e) {
+        printAuditInitError(cluster, "Error creating topic '%s'".formatted(auditTopicName), e);
+        return false;
       }
     } else {
-      //TODO: app config change
+      return true;
     }
   }
 
-  private void createTopicAndProducer(KafkaCluster c,
-                                      ReactiveAdminClient ac,
-                                      MessagesService ms) {
-    var props = c.getOriginalProperties();
+  private void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
+    log.error("-----------------------------------------------------------------");
+    log.error(
+        "Error initializing AUDIT Service for cluster '{}'. Audit will be disabled. See error below: ",
+        cluster.getName()
+    );
+    log.error("{}", errorMsg, cause);
+    log.error("-----------------------------------------------------------------");
+  }
 
-    if (props.getAudit() != null) {
-      var auditProps = props.getAudit();
-      boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
-      boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
-      String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
-      int auditTopicPartis = Optional.ofNullable(auditProps.getAuditTopicsPartitions()).orElse(DEFAULT_AUDIT_TOPIC_PARTITIONS);
-      Map<String, String> topicCreationProps = new HashMap<>(DEFAULT_AUDIT_TOPIC_PROPERTIES);
-      Optional.ofNullable(auditProps.getAuditTopicProperties())
-          .ifPresent(topicCreationProps::putAll);
+  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
+    sendAuditRecord(ctx, user, null);
+  }
 
+  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
+    if (ctx.getCluster() != null) {
+      var writer = auditWriters.get(ctx.getCluster());
+      if (writer != null) {
+        writer.write(ctx, user, th);
+      }
+    } else {
+      //TODO: discuss app config - where to log?
     }
   }
 
   @Override
   public void close() throws IOException {
-
+    auditWriters.values().forEach(AuditWriter::close);
   }
 }

+ 75 - 41
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -4,84 +4,118 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
 import com.provectus.kafka.ui.exception.CustomBaseException;
 import com.provectus.kafka.ui.exception.ValidationException;
-import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.Resource;
 import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
-import com.provectus.kafka.ui.service.AdminClientService;
-import com.provectus.kafka.ui.service.MessagesService;
-import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.security.access.AccessDeniedException;
 
+@Slf4j
 record AuditWriter(String targetTopic,
-                   ReactiveAdminClient adminClient,
-                   KafkaProducer<byte[], byte[]> producer,
-                   boolean logToConsole) {
-
-  static AuditWriter createAndInit(ReactiveAdminClient adminClient,
-                                   Supplier<KafkaProducer<byte[], byte[]>> producerSupplier,
-                                   boolean topicAudit,
-                                   boolean consoleAudit,
-                                   String auditTopicName,
-                                   int auditTopicPartis,
-                                   Map<String, String> topicCreationProps) {
-    KafkaProducer<byte[], byte[]> producer;
-    if (topicAudit) {
-      producer = producerSupplier.get();
-    }
+                   @Nullable KafkaProducer<byte[], byte[]> producer,
+                   boolean logToConsole) implements Closeable {
+
+  //TODO: discuss AUDIT LOG FORMAT
+  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
+
+  public void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
+    write(
+        new AuditRecord(
+            DateTimeFormatter.ISO_INSTANT.format(Instant.now()),
+            user.principal(),
+            ctx.getCluster(),
+            getAccessedResources(ctx),
+            ctx.getOperationDescription(),
+            th == null ? OperationResult.successful() : OperationResult.error(th),
+            ctx.getOperationParams()
+        )
+    );
   }
 
-
-  private void createTopic(ReactiveAdminClient adminClient,
-                   String auditTopicName,
-                   int parts,
-                   Map<String, String> topicCreationProps) {
-
+  private List<AuditResource> getAccessedResources(AccessContext ctx) {
+    List<AuditResource> resources = new ArrayList<>();
+    ctx.getClusterConfigActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.CLUSTERCONFIG, null)));
+    ctx.getTopicActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.TOPIC, nameId(ctx.getTopic()))));
+    ctx.getConsumerGroupActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
+    ctx.getConnectActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.CONNECT, nameId(ctx.getConnect()))));
+    ctx.getSchemaActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
+    ctx.getKsqlActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.KSQL, null)));
+    ctx.getAclActions()
+        .forEach(a -> resources.add(new AuditResource(a, Resource.ACL, null)));
+    return resources;
   }
 
-  public void write(AccessContext ctx, AuthenticatedUser user) {
-
+  @Nullable
+  private Map<String, Object> nameId(@Nullable String name) {
+    return name != null ? Map.of("name", name) : null;
   }
 
-  public void write(AccessContext ctx, AuthenticatedUser user, Throwable th) {
-
+  private void write(AuditRecord rec) {
+    String json = rec.toJson();
+    if (logToConsole) {
+      AUDIT_LOGGER.info(json);
+    }
+    if (producer != null) {
+      producer.send(
+          new ProducerRecord<>(targetTopic, null, json.getBytes(StandardCharsets.UTF_8)),
+          (metadata, ex) -> {
+            log.warn("Error writing Audit record", ex);
+          });
+    }
   }
 
+  @Override
+  public void close() {
+    Optional.ofNullable(producer).ifPresent(KafkaProducer::close);
+  }
 
   record AuditRecord(String timestamp,
                      String userPrincipal,  //TODO: discuss - rename to username?
                      String clusterName,
-                     AuditResource resources,
+                     List<AuditResource> resources,
                      String operation,
                      OperationResult result,
-                     Object params
-  ) {
+                     @Nullable Object params) {
+
     static final JsonMapper MAPPER = new JsonMapper();
 
     @SneakyThrows
-    byte[] toJson() {
-      return MAPPER.writeValueAsString(this).getBytes(StandardCharsets.UTF_8);
+    String toJson() {
+      return MAPPER.writeValueAsString(this);
     }
   }
 
-  record AuditResource(PermissibleAction accessType, Resource type, Object id) {
+  record AuditResource(PermissibleAction accessType, Resource type, @Nullable Object id) {
   }
 
   record OperationResult(boolean success, OperationError error) {
 
-    static OperationResult successResult() {
+    static OperationResult successful() {
       return new OperationResult(true, null);
     }
 
-    static OperationResult errorResult(Throwable th) {
-      OperationError err = OperationError.UNEXPECTED_ERROR;
+    static OperationResult error(Throwable th) {
+      OperationError err = OperationError.UNRECOGNIZED_ERROR;
       if (th instanceof AccessDeniedException) {
         err = OperationError.ACCESS_DENIED;
       } else if (th instanceof ValidationException) {
@@ -96,7 +130,7 @@ record AuditWriter(String targetTopic,
       ACCESS_DENIED,
       VALIDATION_ERROR,
       EXECUTION_ERROR,
-      UNEXPECTED_ERROR
+      UNRECOGNIZED_ERROR
     }
   }