ソースを参照

WIP: operations audit

iliax 2 年 前
コミット
e58ebfd87e

+ 12 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -51,6 +51,7 @@ public class ClustersProperties {
     List<Masking> masking;
     Long pollingThrottleRate;
     TruststoreConfig ssl;
+    AuditProperties audit;
   }
 
   @Data
@@ -143,6 +144,17 @@ public class ClustersProperties {
     }
   }
 
+  @Data
+  @NoArgsConstructor
+  @AllArgsConstructor
+  public static class AuditProperties {
+    String topic;
+    Integer audiTopicsPartitions;
+    Boolean disable;
+    Boolean enableConsoleAudit;
+    Map<String, String> auditTopicProperties;
+  }
+
   @PostConstruct
   public void validateAndSetDefaults() {
     if (clusters != null) {

+ 6 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -143,13 +143,13 @@ public class TopicsController extends AbstractController implements TopicsApi {
   public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
       String clusterName, String topicName, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var validatedAccess = accessControlService.withAccess(AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW)
         .build());
 
-    return validateAccess.then(
+    return validatedAccess.then(
         topicsService.getTopicDetails(getCluster(clusterName), topicName)
             .map(clusterMapper::toTopicDetails)
             .map(ResponseEntity::ok)
@@ -204,13 +204,15 @@ public class TopicsController extends AbstractController implements TopicsApi {
       String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var validatedAccess = accessControlService.withAccess(AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW, EDIT)
+        .operationDescription("Topic update")
+        .operationParams(topicUpdate)
         .build());
 
-    return validateAccess.then(
+    return validatedAccess.then(
         topicsService
             .updateTopic(getCluster(clusterName), topicName, topicUpdate)
             .map(clusterMapper::toTopic)

+ 23 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java

@@ -11,6 +11,7 @@ import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import lombok.Value;
 import org.springframework.util.Assert;
 
@@ -40,6 +41,9 @@ public class AccessContext {
 
   Collection<AclAction> aclActions;
 
+  String operationDescription;
+  Object operationParams;
+
   public static AccessContextBuilder builder() {
     return new AccessContextBuilder();
   }
@@ -59,6 +63,8 @@ public class AccessContext {
     private Collection<SchemaAction> schemaActions = Collections.emptySet();
     private Collection<KsqlAction> ksqlActions = Collections.emptySet();
     private Collection<AclAction> aclActions = Collections.emptySet();
+    String operationDescription;
+    Object operationParams;
 
     private AccessContextBuilder() {
     }
@@ -141,6 +147,21 @@ public class AccessContext {
       return this;
     }
 
+    public AccessContextBuilder operationDescription(String description) {
+      this.operationDescription = operationDescription;
+      return this;
+    }
+
+    public AccessContextBuilder operationParams(Object... operationParams) {
+      this.operationParams = operationParams;
+      return this;
+    }
+
+    public AccessContextBuilder operationParams(Map<String, Object> paramsMap) {
+      this.operationParams = paramsMap;
+      return this;
+    }
+
     public AccessContext build() {
       return new AccessContext(
           applicationConfigActions,
@@ -150,7 +171,8 @@ public class AccessContext {
           connect, connectActions,
           connector,
           schema, schemaActions,
-          ksqlActions, aclActions);
+          ksqlActions, aclActions,
+          operationDescription, operationParams);
     }
   }
 }

+ 5 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java

@@ -1,4 +1,8 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 
-public interface PermissibleAction {
+public sealed interface PermissibleAction permits
+    AclAction, ApplicationConfigAction,
+    ConsumerGroupAction, SchemaAction,
+    ConnectAction, ClusterConfigAction,
+    KsqlAction, TopicAction {
 }

+ 1 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/AdminClientService.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.service;
 
 import com.provectus.kafka.ui.model.KafkaCluster;
+import java.util.Optional;
 import reactor.core.publisher.Mono;
 
 public interface AdminClientService {

+ 12 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/MessagesService.java

@@ -127,13 +127,7 @@ public class MessagesService {
             msg.getValueSerde().get()
         );
 
-    Properties properties = new Properties();
-    SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
-    properties.putAll(cluster.getProperties());
-    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
-    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
-    try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(properties)) {
+    try (KafkaProducer<byte[], byte[]> producer = createProducer(cluster, Map.of())) {
       ProducerRecord<byte[], byte[]> producerRecord = producerRecordCreator.create(
           topicDescription.name(),
           msg.getPartition(),
@@ -155,6 +149,17 @@ public class MessagesService {
     }
   }
 
+  public KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster, Map<String, Object> additionalProps) {
+    Properties properties = new Properties();
+    SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
+    properties.putAll(cluster.getProperties());
+    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
+    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+    properties.putAll(additionalProps);
+    return new KafkaProducer<>(properties);
+  }
+
   public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
                                                  ConsumerPosition consumerPosition,
                                                  @Nullable String query,

+ 98 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -0,0 +1,98 @@
+package com.provectus.kafka.ui.service.audit;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
+import com.provectus.kafka.ui.exception.CustomBaseException;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.Resource;
+import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
+import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.ClustersStorage;
+import com.provectus.kafka.ui.service.MessagesService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.security.access.AccessDeniedException;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class AuditService implements Closeable {
+
+  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
+
+  private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log ";
+  private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
+  private static final Map<String, String> DEFAULT_AUDIT_TOPIC_PROPERTIES = Map.of(
+      "retention.ms", String.valueOf(TimeUnit.DAYS.toMillis(7)),
+      "cleanup.policy", "delete",
+      "compression.type", "gzip"
+  );
+
+
+  private final Map<String, AuditWriter> auditWriters = new ConcurrentHashMap<>();
+
+  private final ClustersStorage clustersStorage;
+
+  public AuditService(ClustersProperties clustersProperties,
+                      MessagesService messagesService,
+                      AdminClientService adminClientService,
+                      ClustersStorage clustersStorage) {
+    this.clustersStorage = clustersStorage;
+    if (clustersProperties.getClusters() != null) {
+      for (var clusterProps : clustersProperties.getClusters()) {
+        createTopicAndProducer(
+            adminClientService,
+            clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow(),
+            messagesService
+        );
+      }
+    }
+  }
+
+  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
+    if (ctx.getCluster() != null) {
+      if (auditWriters.containsKey(ctx.getCluster())) {
+        auditWriters.get(ctx.getCluster()).write(ctx, user);
+      }
+    } else {
+      //TODO: app config change
+    }
+  }
+
+  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, Throwable th) {
+    if (ctx.getCluster() != null) {
+      if (auditWriters.containsKey(ctx.getCluster())) {
+        auditWriters.get(ctx.getCluster()).write(ctx, user, th);
+      }
+    } else {
+      //TODO: app config change
+    }
+  }
+
+  private void createTopicAndProducer(KafkaCluster c,
+                                      ReactiveAdminClient ac,
+                                      MessagesService ms) {
+    var props = c.getOriginalProperties();
+    if (props.getAudit() != null) {
+
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}

+ 86 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -0,0 +1,86 @@
+package com.provectus.kafka.ui.service.audit;
+
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
+import com.provectus.kafka.ui.exception.CustomBaseException;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.Resource;
+import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
+import com.provectus.kafka.ui.service.AdminClientService;
+import com.provectus.kafka.ui.service.MessagesService;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.nio.charset.StandardCharsets;
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.springframework.security.access.AccessDeniedException;
+
+record AuditWriter(String targetTopic,
+                   ReactiveAdminClient adminClient,
+                   KafkaProducer<byte[], byte[]> producer,
+                   boolean logToConsole) {
+
+  static AuditWriter createAndInit(KafkaCluster c,
+                                   ReactiveAdminClient ac,
+                                   MessagesService ms) {
+
+  }
+
+  public void write(AccessContext ctx, AuthenticatedUser user) {
+
+  }
+
+  public void write(AccessContext ctx, AuthenticatedUser user, Throwable th) {
+
+  }
+
+
+  record AuditRecord(String timestamp,
+                     String userPrincipal, //TODO: discuss - rename to username?
+                     String clusterName,
+                     AuditResource resources,
+                     String operation,
+                     OperationResult result,
+                     Object params
+  ) {
+    static final JsonMapper MAPPER = new JsonMapper();
+
+    @SneakyThrows
+    byte[] toJson() {
+      return MAPPER.writeValueAsString(this).getBytes(StandardCharsets.UTF_8);
+    }
+  }
+
+  record AuditResource(PermissibleAction accessType, Resource type, Object id) {
+  }
+
+  record OperationResult(boolean success, OperationError error) {
+  }
+
+  static OperationResult successResult() {
+    return new OperationResult(true, null);
+  }
+
+  static OperationResult errorResult(Throwable th) {
+    OperationError err = OperationError.UNEXPECTED_ERROR;
+    if (th instanceof AccessDeniedException) {
+      err = OperationError.ACCESS_DENIED;
+    } else if (th instanceof ValidationException) {
+      err = OperationError.VALIDATION_ERROR;
+    } else if (th instanceof CustomBaseException) {
+      err = OperationError.EXECUTION_ERROR;
+    }
+    return new OperationResult(false, err);
+  }
+
+  enum OperationError {
+    ACCESS_DENIED,
+    VALIDATION_ERROR,
+    EXECUTION_ERROR,
+    UNEXPECTED_ERROR
+  }
+
+}
+
+

+ 58 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java

@@ -17,6 +17,7 @@ import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.extractor.CognitoAuthorityExtractor;
 import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
 import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
@@ -44,6 +45,8 @@ import org.springframework.security.oauth2.client.registration.InMemoryReactiveC
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
 
 @Service
 @RequiredArgsConstructor
@@ -51,6 +54,8 @@ import reactor.core.publisher.Mono;
 @Slf4j
 public class AccessControlService {
 
+  private final AuditService auditService;
+
   @Nullable
   private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
   private final RoleBasedAccessControlProperties properties;
@@ -92,6 +97,59 @@ public class AccessControlService {
     }
   }
 
+  @RequiredArgsConstructor
+  public class OperationContext {
+
+    private final AccessContext accessContext;
+
+    public <T> Mono<T> then(Mono<T> mono) {
+      return validateAccessAndReturnUser(accessContext)
+          .flatMap(t -> {
+            if (t.getT2()) {
+              return mono
+                  .doOnSuccess(r -> auditService.sendAuditRecord(accessContext, t.getT1()))
+                  .doOnError(th -> true, th -> auditService.sendAuditRecord(accessContext, t.getT1(), th));
+            } else {
+              var ex = new AccessDeniedException("Access denied");
+              auditService.sendAuditRecord(accessContext, t.getT1(), ex);
+              return Mono.error(ex);
+            }
+          });
+    }
+
+    public <T> Mono<T> thenReturn(T objToReturn) {
+      return then(Mono.justOrEmpty(objToReturn));
+    }
+  }
+
+  public OperationContext withAccess(AccessContext context) {
+    return new OperationContext(context);
+  }
+
+  // [user, access granted flag]
+  private Mono<Tuple2<AuthenticatedUser, Boolean>> validateAccessAndReturnUser(AccessContext context) {
+    if (!rbacEnabled) {
+      return Mono.just(Tuples.of(new AuthenticatedUser("Unknown", Set.of()), true));
+    }
+
+    return getUser()
+        .map(user -> {
+          boolean accessGranted =
+              isApplicationConfigAccessible(context, user)
+                  && isClusterAccessible(context, user)
+                  && isClusterConfigAccessible(context, user)
+                  && isTopicAccessible(context, user)
+                  && isConsumerGroupAccessible(context, user)
+                  && isConnectAccessible(context, user)
+                  && isConnectorAccessible(context, user) // TODO connector selectors
+                  && isSchemaAccessible(context, user)
+                  && isKsqlAccessible(context, user)
+                  && isAclAccessible(context, user);
+
+          return Tuples.of(user, accessGranted);
+        });
+  }
+
   public Mono<Void> validateAccess(AccessContext context) {
     if (!rbacEnabled) {
       return Mono.empty();