iliax 2 years ago
parent
commit
b8d6afd056
17 changed files with 572 additions and 414 deletions
  1. 16 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
  2. 32 24
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java
  3. 47 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java
  4. 11 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java
  5. 33 22
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java
  6. 62 42
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java
  7. 37 25
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java
  8. 21 15
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  9. 64 35
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java
  10. 84 53
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
  11. 5 5
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java
  12. 96 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java
  13. 47 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java
  14. 12 87
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java
  15. 0 56
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java
  16. 3 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java
  17. 2 1
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

+ 16 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

@@ -8,6 +8,7 @@ import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.AclAction;
 import com.provectus.kafka.ui.service.acl.AclsService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
@@ -26,6 +27,7 @@ public class AclsController extends AbstractController implements AclsApi {
 
   private final AclsService aclsService;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
@@ -33,13 +35,15 @@ public class AclsController extends AbstractController implements AclsApi {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
         .aclActions(AclAction.EDIT)
+        .auditOperation("createAcl")
         .build();
 
     return accessControlService.validateAccess(context)
-        .then(kafkaAclDto)
-        .map(ClusterMapper::toAclBinding)
-        .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
-        .thenReturn(ResponseEntity.ok().build());
+        .<ResponseEntity<Void>>then(
+            kafkaAclDto.map(ClusterMapper::toAclBinding)
+                .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
+                .thenReturn(ResponseEntity.ok().build())
+        ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -48,12 +52,14 @@ public class AclsController extends AbstractController implements AclsApi {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
         .aclActions(AclAction.EDIT)
+        .auditOperation("deleteAcl")
         .build();
 
     return accessControlService.validateAccess(context)
         .then(kafkaAclDto)
         .map(ClusterMapper::toAclBinding)
         .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
+        .doOnEach(sig -> auditService.audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
   }
 
@@ -66,6 +72,7 @@ public class AclsController extends AbstractController implements AclsApi {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
         .aclActions(AclAction.VIEW)
+        .auditOperation("listAcls")
         .build();
 
     var resourceType = Optional.ofNullable(resourceTypeDto)
@@ -83,7 +90,7 @@ public class AclsController extends AbstractController implements AclsApi {
             ResponseEntity.ok(
                 aclsService.listAcls(getCluster(clusterName), filter)
                     .map(ClusterMapper::toKafkaAclDto)))
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -91,12 +98,14 @@ public class AclsController extends AbstractController implements AclsApi {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
         .aclActions(AclAction.VIEW)
+        .auditOperation("getAclAsCsv")
         .build();
 
     return accessControlService.validateAccess(context).then(
         aclsService.getAclAsCsvString(getCluster(clusterName))
             .map(ResponseEntity::ok)
             .flatMap(Mono::just)
+            .doOnEach(sig -> auditService.audit(context, sig))
     );
   }
 
@@ -105,11 +114,13 @@ public class AclsController extends AbstractController implements AclsApi {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
         .aclActions(AclAction.EDIT)
+        .auditOperation("syncAclsCsv")
         .build();
 
     return accessControlService.validateAccess(context)
         .then(csvMono)
         .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
+        .doOnEach(sig -> auditService.audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
   }
 }

+ 32 - 24
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java

@@ -15,6 +15,7 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.ApplicationInfoService;
 import com.provectus.kafka.ui.service.KafkaClusterFactory;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import com.provectus.kafka.ui.util.ApplicationRestarter;
 import com.provectus.kafka.ui.util.DynamicConfigOperations;
@@ -55,6 +56,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
   private final ApplicationRestarter restarter;
   private final KafkaClusterFactory kafkaClusterFactory;
   private final ApplicationInfoService applicationInfoService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
@@ -63,62 +65,68 @@ public class ApplicationConfigController implements ApplicationConfigApi {
 
   @Override
   public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
-    return accessControlService
-        .validateAccess(
-            AccessContext.builder()
-                .applicationConfigActions(VIEW)
-                .build()
-        )
+    var context = AccessContext.builder()
+        .applicationConfigActions(VIEW)
+        .auditOperation("getCurrentConfig")
+        .build();
+    return accessControlService.validateAccess(context)
         .then(Mono.fromSupplier(() -> ResponseEntity.ok(
             new ApplicationConfigDTO()
                 .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
-        )));
+        )))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
                                                       ServerWebExchange exchange) {
-    return accessControlService
-        .validateAccess(
-            AccessContext.builder()
-                .applicationConfigActions(EDIT)
-                .build()
-        )
+    var context =  AccessContext.builder()
+        .applicationConfigActions(EDIT)
+        .auditOperation("restartWithConfig")
+        .build();
+    return accessControlService.validateAccess(context)
         .then(restartRequestDto)
-        .map(dto -> {
+        .<ResponseEntity<Void>>map(dto -> {
           dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
           restarter.requestRestart();
           return ResponseEntity.ok().build();
-        });
+        })
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
                                                                            ServerWebExchange exchange) {
-    return accessControlService
-        .validateAccess(
-            AccessContext.builder()
-                .applicationConfigActions(EDIT)
-                .build()
-        )
+    var context = AccessContext.builder()
+        .applicationConfigActions(EDIT)
+        .auditOperation("uploadConfigRelatedFile")
+        .build();
+    return accessControlService.validateAccess(context)
         .then(fileFlux.single())
         .flatMap(file ->
             dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
                 .map(path -> new UploadedFileInfoDTO().location(path.toString()))
-                .map(ResponseEntity::ok));
+                .map(ResponseEntity::ok))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
                                                                              ServerWebExchange exchange) {
-    return configDto
+    var context = AccessContext.builder()
+        .applicationConfigActions(EDIT)
+        .auditOperation("validateConfig")
+        .build();
+    return accessControlService.validateAccess(context)
+        .then(configDto)
         .flatMap(config -> {
           PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
           ClustersProperties clustersProperties = propertiesStructure.getKafka();
           return validateClustersConfig(clustersProperties)
               .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
         })
-        .map(ResponseEntity::ok);
+        .map(ResponseEntity::ok)
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(

+ 47 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -11,8 +11,10 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
 import com.provectus.kafka.ui.service.BrokerService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
+import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.ResponseEntity;
@@ -27,61 +29,73 @@ import reactor.core.publisher.Mono;
 public class BrokersController extends AbstractController implements BrokersApi {
   private final BrokerService brokerService;
   private final ClusterMapper clusterMapper;
+
+  private final AuditService auditService;
   private final AccessControlService accessControlService;
 
   @Override
   public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
                                                           ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
-        .build());
+        .auditOperation("getBrokers")
+        .build();
 
     var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
-
-    return validateAccess.thenReturn(ResponseEntity.ok(job));
+    return accessControlService.validateAccess(context)
+        .thenReturn(ResponseEntity.ok(job))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
                                                                   ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
-        .build());
+        .auditOperation("getBrokersMetrics")
+        .build();
 
-    return validateAccess.then(
-        brokerService.getBrokerMetrics(getCluster(clusterName), id)
-            .map(clusterMapper::toBrokerMetrics)
-            .map(ResponseEntity::ok)
-            .onErrorReturn(ResponseEntity.notFound().build())
-    );
+    return accessControlService.validateAccess(context)
+        .then(
+            brokerService.getBrokerMetrics(getCluster(clusterName), id)
+                .map(clusterMapper::toBrokerMetrics)
+                .map(ResponseEntity::ok)
+                .onErrorReturn(ResponseEntity.notFound().build())
+        )
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
                                                                             List<Integer> brokers,
                                                                             ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
-        .build());
-
-    return validateAccess.thenReturn(ResponseEntity.ok(
-        brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
+        .auditOperation("getAllBrokersLogdirs")
+        .operationParams(Map.of("brokerIds", brokers))
+        .build();
+    return accessControlService.validateAccess(context)
+        .thenReturn(ResponseEntity.ok(
+            brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
                                                                      Integer id,
                                                                      ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .clusterConfigActions(ClusterConfigAction.VIEW)
-        .build());
+        .auditOperation("getBrokerConfig")
+        .operationParams("brokerId", id)
+        .build();
 
-    return validateAccess.thenReturn(
+    return accessControlService.validateAccess(context).thenReturn(
         ResponseEntity.ok(
             brokerService.getBrokerConfig(getCluster(clusterName), id)
                 .map(clusterMapper::toBrokerConfig))
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -89,16 +103,18 @@ public class BrokersController extends AbstractController implements BrokersApi
                                                                      Integer id,
                                                                      Mono<BrokerLogdirUpdateDTO> brokerLogdir,
                                                                      ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
-        .build());
+        .auditOperation("updateBrokerTopicPartitionLogDir")
+        .operationParams("brokerId", id)
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         brokerLogdir
             .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -107,16 +123,18 @@ public class BrokersController extends AbstractController implements BrokersApi
                                                              String name,
                                                              Mono<BrokerConfigItemDTO> brokerConfig,
                                                              ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
-        .build());
+        .auditOperation("updateBrokerConfigByName")
+        .operationParams(Map.of("brokerId", id))
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         brokerConfig
             .flatMap(bci -> brokerService.updateBrokerConfigByName(
                 getCluster(clusterName), id, name, bci.getValue()))
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 }

+ 11 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java

@@ -6,6 +6,7 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
 import com.provectus.kafka.ui.model.ClusterStatsDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.ClusterService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -21,6 +22,7 @@ import reactor.core.publisher.Mono;
 public class ClustersController extends AbstractController implements ClustersApi {
   private final ClusterService clusterService;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
@@ -35,6 +37,7 @@ public class ClustersController extends AbstractController implements ClustersAp
                                                                    ServerWebExchange exchange) {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
+        .auditOperation("getClusterMetrics")
         .build();
 
     return accessControlService.validateAccess(context)
@@ -42,7 +45,8 @@ public class ClustersController extends AbstractController implements ClustersAp
             clusterService.getClusterMetrics(getCluster(clusterName))
                 .map(ResponseEntity::ok)
                 .onErrorReturn(ResponseEntity.notFound().build())
-        );
+        )
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -50,6 +54,7 @@ public class ClustersController extends AbstractController implements ClustersAp
                                                                ServerWebExchange exchange) {
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
+        .auditOperation("getClusterStats")
         .build();
 
     return accessControlService.validateAccess(context)
@@ -57,7 +62,8 @@ public class ClustersController extends AbstractController implements ClustersAp
             clusterService.getClusterStats(getCluster(clusterName))
                 .map(ResponseEntity::ok)
                 .onErrorReturn(ResponseEntity.notFound().build())
-        );
+        )
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -66,11 +72,11 @@ public class ClustersController extends AbstractController implements ClustersAp
 
     AccessContext context = AccessContext.builder()
         .cluster(clusterName)
+        .auditOperation("updateClusterInfo")
         .build();
 
     return accessControlService.validateAccess(context)
-        .then(
-            clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok)
-        );
+        .then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 }

+ 33 - 22
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java

@@ -19,6 +19,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.OffsetsResetService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Map;
 import java.util.Optional;
@@ -42,6 +43,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
   private final ConsumerGroupService consumerGroupService;
   private final OffsetsResetService offsetsResetService;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Value("${consumer.groups.page.size:25}")
   private int defaultConsumerGroupsPageSize;
@@ -50,44 +52,47 @@ public class ConsumerGroupsController extends AbstractController implements Cons
   public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
                                                         String id,
                                                         ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .consumerGroup(id)
         .consumerGroupActions(DELETE)
-        .build());
+        .auditOperation("deleteConsumerGroup")
+        .build();
 
-    return validateAccess.then(
-        consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)
-            .thenReturn(ResponseEntity.ok().build())
-    );
+    return accessControlService.validateAccess(context)
+        .then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
   }
 
   @Override
   public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
                                                                         String consumerGroupId,
                                                                         ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .consumerGroup(consumerGroupId)
         .consumerGroupActions(VIEW)
-        .build());
+        .auditOperation("getConsumerGroup")
+        .build();
 
-    return validateAccess.then(
-        consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
+    return accessControlService.validateAccess(context)
+        .then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
             .map(ConsumerGroupMapper::toDetailsDto)
-            .map(ResponseEntity::ok)
-    );
+            .map(ResponseEntity::ok))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String clusterName,
                                                                              String topicName,
                                                                              ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(TopicAction.VIEW)
-        .build());
+        .auditOperation("getTopicConsumerGroups")
+        .build();
 
     Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> job =
         consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName)
@@ -99,7 +104,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
             .map(ResponseEntity::ok)
             .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
 
-    return validateAccess.then(job);
+    return accessControlService.validateAccess(context)
+        .then(job)
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -112,12 +119,13 @@ public class ConsumerGroupsController extends AbstractController implements Cons
       SortOrderDTO sortOrderDto,
       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         // consumer group access validation is within the service
-        .build());
+        .auditOperation("getConsumerGroupsPage")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         consumerGroupService.getConsumerGroupsPage(
                 getCluster(clusterName),
                 Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
@@ -128,7 +136,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
             )
             .map(this::convertPage)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -137,12 +145,13 @@ public class ConsumerGroupsController extends AbstractController implements Cons
                                                               Mono<ConsumerGroupOffsetsResetDTO> resetDto,
                                                               ServerWebExchange exchange) {
     return resetDto.flatMap(reset -> {
-      Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+      var context = AccessContext.builder()
           .cluster(clusterName)
           .topic(reset.getTopic())
           .topicActions(TopicAction.VIEW)
           .consumerGroupActions(RESET_OFFSETS)
-          .build());
+          .auditOperation("resetConsumerGroupOffsets")
+          .build();
 
       Supplier<Mono<Void>> mono = () -> {
         var cluster = getCluster(clusterName);
@@ -182,7 +191,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
         }
       };
 
-      return validateAccess.then(mono.get());
+      return accessControlService.validateAccess(context)
+          .then(mono.get())
+          .doOnEach(sig -> auditService.audit(context, sig));
     }).thenReturn(ResponseEntity.ok().build());
   }
 

+ 62 - 42
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

@@ -14,6 +14,7 @@ import com.provectus.kafka.ui.model.TaskDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.service.KafkaConnectService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Comparator;
 import java.util.Map;
@@ -32,6 +33,7 @@ import reactor.core.publisher.Mono;
 public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
   private final KafkaConnectService kafkaConnectService;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
@@ -47,15 +49,16 @@ public class KafkaConnectController extends AbstractController implements KafkaC
   public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
                                                           ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW)
-        .build());
+        .auditOperation("getConnectors")
+        .build();
 
-    return validateAccess.thenReturn(
-        ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
-    );
+    return accessControlService.validateAccess(context)
+        .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -63,16 +66,17 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                             @Valid Mono<NewConnectorDTO> connector,
                                                             ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
-        .build());
+        .auditOperation("createConnector")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -80,17 +84,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                          String connectorName,
                                                          ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW)
         .connector(connectorName)
-        .build());
+        .auditOperation("getConnector")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -98,16 +103,17 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                     String connectorName,
                                                     ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
-        .build());
+        .auditOperation("deleteConnector")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
 
@@ -119,6 +125,12 @@ public class KafkaConnectController extends AbstractController implements KafkaC
       SortOrderDTO sortOrder,
       ServerWebExchange exchange
   ) {
+    var context = AccessContext.builder()
+        .cluster(clusterName)
+        .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
+        .auditOperation("getAllConnectors")
+        .build();
+
     var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
         ? getConnectorsComparator(orderBy)
         : getConnectorsComparator(orderBy).reversed();
@@ -126,7 +138,8 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
         .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
 
-    return Mono.just(ResponseEntity.ok(job.sort(comparator)));
+    return Mono.just(ResponseEntity.ok(job.sort(comparator)))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -135,17 +148,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                                       String connectorName,
                                                                       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW)
-        .build());
+        .auditOperation("getConnectorConfig")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService
             .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -154,16 +168,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                                Mono<Map<String, Object>> requestBody,
                                                                ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
-        .build());
+        .auditOperation("setConnectorConfig")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService
             .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
-            .map(ResponseEntity::ok));
+            .map(ResponseEntity::ok))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -172,17 +188,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                          ConnectorActionDTO action,
                                                          ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
-        .build());
+        .auditOperation("updateConnectorState")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService
             .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -190,17 +207,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                                String connectName,
                                                                String connectorName,
                                                                ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW)
-        .build());
+        .auditOperation("getConnectorTasks")
+        .build();
 
-    return validateAccess.thenReturn(
+    return accessControlService.validateAccess(context).thenReturn(
         ResponseEntity
             .ok(kafkaConnectService
                 .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -208,34 +226,36 @@ public class KafkaConnectController extends AbstractController implements KafkaC
                                                          String connectorName, Integer taskId,
                                                          ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
-        .build());
+        .auditOperation("restartConnectorTask")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         kafkaConnectService
             .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
       String clusterName, String connectName, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .connect(connectName)
         .connectActions(ConnectAction.VIEW)
-        .build());
+        .auditOperation("getConnectorPlugins")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         Mono.just(
             ResponseEntity.ok(
                 kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override

+ 37 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -9,6 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
 import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
@@ -29,38 +30,43 @@ public class KsqlController extends AbstractController implements KsqlApi {
 
   private final KsqlServiceV2 ksqlServiceV2;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
-                                                                    Mono<KsqlCommandV2DTO>
-                                                                        ksqlCommand2Dto,
+                                                                    Mono<KsqlCommandV2DTO> ksqlCmdDo,
                                                                     ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
-        .cluster(clusterName)
-        .ksqlActions(KsqlAction.EXECUTE)
-        .build());
-
-    return validateAccess.then(
-        ksqlCommand2Dto.map(dto -> {
-          var id = ksqlServiceV2.registerCommand(
-              getCluster(clusterName),
-              dto.getKsql(),
-              Optional.ofNullable(dto.getStreamsProperties()).orElse(Map.of()));
-          return new KsqlCommandV2ResponseDTO().pipeId(id);
-        }).map(ResponseEntity::ok)
-    );
+    return ksqlCmdDo.flatMap(
+            command -> {
+              var context = AccessContext.builder()
+                  .cluster(clusterName)
+                  .ksqlActions(KsqlAction.EXECUTE)
+                  .auditOperation("executeKsql")
+                  .operationParams(command)
+                  .build();
+              return accessControlService.validateAccess(context).thenReturn(
+                      new KsqlCommandV2ResponseDTO().pipeId(
+                          ksqlServiceV2.registerCommand(
+                              getCluster(clusterName),
+                              command.getKsql(),
+                              Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
+                  .doOnEach(sig -> auditService.audit(context, sig));
+            }
+        )
+        .map(ResponseEntity::ok);
   }
 
   @Override
   public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
                                                                           String pipeId,
                                                                           ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .ksqlActions(KsqlAction.EXECUTE)
-        .build());
+        .auditOperation("openKsqlResponsePipe")
+        .build();
 
-    return validateAccess.thenReturn(
+    return accessControlService.validateAccess(context).thenReturn(
         ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
             .map(table -> new KsqlResponseDTO()
                 .table(
@@ -74,22 +80,28 @@ public class KsqlController extends AbstractController implements KsqlApi {
   @Override
   public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
                                                                           ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .ksqlActions(KsqlAction.EXECUTE)
-        .build());
+        .auditOperation("listStreams")
+        .build();
 
-    return validateAccess.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))));
+    return accessControlService.validateAccess(context)
+        .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
                                                                         ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .ksqlActions(KsqlAction.EXECUTE)
-        .build());
+        .auditOperation("listTables")
+        .build();
 
-    return validateAccess.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))));
+    return accessControlService.validateAccess(context)
+        .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 }

+ 21 - 15
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -21,6 +21,7 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.Map;
@@ -46,25 +47,26 @@ public class MessagesController extends AbstractController implements MessagesAp
   private final MessagesService messagesService;
   private final DeserializationService deserializationService;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<Void>> deleteTopicMessages(
       String clusterName, String topicName, @Valid List<Integer> partitions,
       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_DELETE)
-        .build());
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
         messagesService.deleteTopicMessages(
             getCluster(clusterName),
             topicName,
             Optional.ofNullable(partitions).orElse(List.of())
         ).thenReturn(ResponseEntity.ok().build())
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -79,11 +81,12 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                            String keySerde,
                                                                            String valueSerde,
                                                                            ServerWebExchange exchange) {
-    final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
-        .build());
+        .auditOperation("getTopicMessages")
+        .build();
 
     seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
     seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
@@ -102,7 +105,9 @@ public class MessagesController extends AbstractController implements MessagesAp
         )
     );
 
-    return validateAccess.then(job);
+    return accessControlService.validateAccess(context)
+        .then(job)
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -110,17 +115,18 @@ public class MessagesController extends AbstractController implements MessagesAp
       String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_PRODUCE)
-        .build());
+        .auditOperation("sendTopicMessages")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         createTopicMessage.flatMap(msg ->
             messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
         ).map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   /**
@@ -156,12 +162,12 @@ public class MessagesController extends AbstractController implements MessagesAp
                                                                  String topicName,
                                                                  SerdeUsageDTO use,
                                                                  ServerWebExchange exchange) {
-
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(TopicAction.VIEW)
-        .build());
+        .auditOperation("getSerdes")
+        .build();
 
     TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
         .key(use == SerdeUsageDTO.SERIALIZE
@@ -171,7 +177,7 @@ public class MessagesController extends AbstractController implements MessagesAp
             ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
             : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         Mono.just(dto)
             .subscribeOn(Schedulers.boundedElastic())
             .map(ResponseEntity::ok)

+ 64 - 35
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java

@@ -13,8 +13,10 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.service.SchemaRegistryService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
@@ -37,6 +39,7 @@ public class SchemasController extends AbstractController implements SchemasApi
 
   private final SchemaRegistryService schemaRegistryService;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   protected KafkaCluster getCluster(String clusterName) {
@@ -51,13 +54,14 @@ public class SchemasController extends AbstractController implements SchemasApi
   public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
       String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
       ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
         .schemaActions(SchemaAction.VIEW)
-        .build());
+        .auditOperation("checkSchemaCompatibility")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         newSchemaSubjectMono.flatMap(subjectDTO ->
                 schemaRegistryService.checksSchemaCompatibility(
                     getCluster(clusterName),
@@ -66,19 +70,20 @@ public class SchemasController extends AbstractController implements SchemasApi
                 ))
             .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
       String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
       ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schemaActions(SchemaAction.CREATE)
-        .build());
+        .auditOperation("createNewSchema")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         newSchemaSubjectMono.flatMap(newSubject ->
                 schemaRegistryService.registerNewSchema(
                     getCluster(clusterName),
@@ -87,20 +92,22 @@ public class SchemasController extends AbstractController implements SchemasApi
                 )
             ).map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Void>> deleteLatestSchema(
       String clusterName, String subject, ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
         .schemaActions(SchemaAction.DELETE)
-        .build());
+        .auditOperation("deleteLatestSchema")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
+            .doOnEach(sig -> auditService.audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
     );
   }
@@ -108,14 +115,16 @@ public class SchemasController extends AbstractController implements SchemasApi
   @Override
   public Mono<ResponseEntity<Void>> deleteSchema(
       String clusterName, String subject, ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
         .schemaActions(SchemaAction.DELETE)
-        .build());
+        .auditOperation("deleteSchema")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
+            .doOnEach(sig -> auditService.audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
     );
   }
@@ -123,14 +132,16 @@ public class SchemasController extends AbstractController implements SchemasApi
   @Override
   public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
       String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subjectName)
         .schemaActions(SchemaAction.DELETE)
-        .build());
+        .auditOperation("deleteSchemaByVersion")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
+            .doOnEach(sig -> auditService.audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
     );
   }
@@ -138,16 +149,20 @@ public class SchemasController extends AbstractController implements SchemasApi
   @Override
   public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
       String clusterName, String subjectName, ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subjectName)
         .schemaActions(SchemaAction.VIEW)
-        .build());
+        .auditOperation("getAllVersionsBySubject")
+        .build();
 
     Flux<SchemaSubjectDTO> schemas =
         schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
             .map(kafkaSrMapper::toDto);
-    return validateAccess.thenReturn(ResponseEntity.ok(schemas));
+
+    return accessControlService.validateAccess(context)
+        .thenReturn(ResponseEntity.ok(schemas))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -163,34 +178,37 @@ public class SchemasController extends AbstractController implements SchemasApi
   public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName,
                                                                 String subject,
                                                                 ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
         .schemaActions(SchemaAction.VIEW)
-        .build());
+        .auditOperation("getLatestSchema")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
             .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
       String clusterName, String subject, Integer version, ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schema(subject)
         .schemaActions(SchemaAction.VIEW)
-        .build());
+        .auditOperation("getSchemaByVersion")
+        .operationParams(Map.of("subject", subject, "version", version))
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         schemaRegistryService.getSchemaSubjectByVersion(
                 getCluster(clusterName), subject, version)
             .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -199,6 +217,11 @@ public class SchemasController extends AbstractController implements SchemasApi
                                                                     @Valid Integer perPage,
                                                                     @Valid String search,
                                                                     ServerWebExchange serverWebExchange) {
+    var context = AccessContext.builder()
+        .cluster(clusterName)
+        .auditOperation("getSchemas")
+        .build();
+
     return schemaRegistryService
         .getAllSubjectNames(getCluster(clusterName))
         .flatMapIterable(l -> l)
@@ -220,25 +243,28 @@ public class SchemasController extends AbstractController implements SchemasApi
           return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
               .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
               .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
-        }).map(ResponseEntity::ok);
+        }).map(ResponseEntity::ok)
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
       String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
       ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
-        .build());
+        .auditOperation("updateGlobalSchemaCompatibilityLevel")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         compatibilityLevelMono
             .flatMap(compatibilityLevelDTO ->
                 schemaRegistryService.updateGlobalSchemaCompatibility(
                     getCluster(clusterName),
                     kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
                 ))
+            .doOnEach(sig -> auditService.audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
     );
   }
@@ -247,12 +273,14 @@ public class SchemasController extends AbstractController implements SchemasApi
   public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
       String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
       ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .schemaActions(SchemaAction.EDIT)
-        .build());
+        .auditOperation("updateSchemaCompatibilityLevel")
+        .operationParams(Map.of("subject", subject))
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         compatibilityLevelMono
             .flatMap(compatibilityLevelDTO ->
                 schemaRegistryService.updateSchemaCompatibility(
@@ -260,6 +288,7 @@ public class SchemasController extends AbstractController implements SchemasApi
                     subject,
                     kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
                 ))
+            .doOnEach(sig -> auditService.audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
     );
   }

+ 84 - 53
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -8,6 +8,8 @@ import static com.provectus.kafka.ui.model.rbac.permission.TopicAction.VIEW;
 import static java.util.stream.Collectors.toList;
 
 import com.provectus.kafka.ui.api.TopicsApi;
+import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
+import com.provectus.kafka.ui.config.auth.RbacUser;
 import com.provectus.kafka.ui.mapper.ClusterMapper;
 import com.provectus.kafka.ui.model.InternalTopic;
 import com.provectus.kafka.ui.model.InternalTopicConfig;
@@ -27,19 +29,26 @@ import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.TopicsService;
 import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import javax.validation.Valid;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.security.core.context.ReactiveSecurityContextHolder;
+import org.springframework.security.core.context.SecurityContext;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
 @RestController
 @RequiredArgsConstructor
@@ -52,69 +61,76 @@ public class TopicsController extends AbstractController implements TopicsApi {
   private final TopicAnalysisService topicAnalysisService;
   private final ClusterMapper clusterMapper;
   private final AccessControlService accessControlService;
+  private final AuditService auditService;
 
   @Override
   public Mono<ResponseEntity<TopicDTO>> createTopic(
       String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topicActions(CREATE)
-        .build());
+        .auditOperation("createTopic")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicsService.createTopic(getCluster(clusterName), topicCreation)
             .map(clusterMapper::toTopic)
             .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
             .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
                                                       String topicName, ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW, CREATE, DELETE)
-        .build());
+        .auditOperation("recreateTopic")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicsService.recreateTopic(getCluster(clusterName), topicName)
             .map(clusterMapper::toTopic)
             .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<TopicDTO>> cloneTopic(
       String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW, CREATE)
-        .build());
+        .auditOperation("cloneTopic")
+        .operationParams(Map.of("newTopicName", newTopicName))
+        .build();
 
-    return validateAccess.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
-        .map(clusterMapper::toTopic)
-        .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
-    );
+    return accessControlService.validateAccess(context)
+        .then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
+            .map(clusterMapper::toTopic)
+            .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
+        ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Void>> deleteTopic(
       String clusterName, String topicName, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(DELETE)
-        .build());
+        .auditOperation("deleteTopic")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
 
@@ -122,13 +138,14 @@ public class TopicsController extends AbstractController implements TopicsApi {
   public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
       String clusterName, String topicName, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW)
-        .build());
+        .auditOperation("getTopicConfigs")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicsService.getTopicConfigs(getCluster(clusterName), topicName)
             .map(lst -> lst.stream()
                 .map(InternalTopicConfig::from)
@@ -136,24 +153,25 @@ public class TopicsController extends AbstractController implements TopicsApi {
                 .collect(toList()))
             .map(Flux::fromIterable)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
       String clusterName, String topicName, ServerWebExchange exchange) {
 
-    var validatedAccess = accessControlService.withAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW)
-        .build());
+        .auditOperation("getTopicDetails")
+        .build();
 
-    return validatedAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicsService.getTopicDetails(getCluster(clusterName), topicName)
             .map(clusterMapper::toTopicDetails)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -166,6 +184,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
                                                            @Valid SortOrderDTO sortOrder,
                                                            ServerWebExchange exchange) {
 
+    AccessContext context = AccessContext.builder()
+        .cluster(clusterName)
+        .auditOperation("getTopics")
+        .build();
+
     return topicsService.getTopicsForPagination(getCluster(clusterName))
         .flatMap(existingTopics -> {
           int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
@@ -196,7 +219,8 @@ public class TopicsController extends AbstractController implements TopicsApi {
                       .topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
                       .pageCount(totalPages));
         })
-        .map(ResponseEntity::ok);
+        .map(ResponseEntity::ok)
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -204,20 +228,19 @@ public class TopicsController extends AbstractController implements TopicsApi {
       String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
       ServerWebExchange exchange) {
 
-    var validatedAccess = accessControlService.withAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW, EDIT)
-        .operationDescription("Topic update")
-        .operationParams(topicUpdate)
-        .build());
+        .auditOperation("updateTopic")
+        .build();
 
-    return validatedAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicsService
             .updateTopic(getCluster(clusterName), topicName, topicUpdate)
             .map(clusterMapper::toTopic)
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -226,17 +249,17 @@ public class TopicsController extends AbstractController implements TopicsApi {
       Mono<PartitionsIncreaseDTO> partitionsIncrease,
       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW, EDIT)
-        .build());
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         partitionsIncrease.flatMap(partitions ->
             topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
         ).map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
@@ -245,31 +268,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
       Mono<ReplicationFactorChangeDTO> replicationFactorChange,
       ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(VIEW, EDIT)
-        .build());
+        .auditOperation("changeReplicationFactor")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         replicationFactorChange
             .flatMap(rfc ->
                 topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
             .map(ResponseEntity::ok)
-    );
+    ).doOnEach(sig -> auditService.audit(context, sig));
   }
 
   @Override
   public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
-        .build());
+        .auditOperation("analyzeTopic")
+        .build();
 
-    return validateAccess.then(
+    return accessControlService.validateAccess(context).then(
         topicAnalysisService.analyze(getCluster(clusterName), topicName)
+            .doOnEach(sig -> auditService.audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
     );
   }
@@ -277,15 +303,17 @@ public class TopicsController extends AbstractController implements TopicsApi {
   @Override
   public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
                                                         ServerWebExchange exchange) {
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
-        .build());
-
-    topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
+        .auditOperation("cancelTopicAnalysis")
+        .build();
 
-    return validateAccess.thenReturn(ResponseEntity.ok().build());
+    return accessControlService.validateAccess(context)
+        .then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
+        .doOnEach(sig -> auditService.audit(context, sig))
+        .thenReturn(ResponseEntity.ok().build());
   }
 
 
@@ -294,15 +322,18 @@ public class TopicsController extends AbstractController implements TopicsApi {
                                                                  String topicName,
                                                                  ServerWebExchange exchange) {
 
-    Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
+    var context = AccessContext.builder()
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
-        .build());
+        .auditOperation("getTopicAnalysis")
+        .build();
 
-    return validateAccess.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
+    return accessControlService.validateAccess(context)
+        .thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
         .map(ResponseEntity::ok)
-        .orElseGet(() -> ResponseEntity.notFound().build()));
+        .orElseGet(() -> ResponseEntity.notFound().build()))
+        .doOnEach(sig -> auditService.audit(context, sig));
   }
 
   private Comparator<InternalTopic> getComparatorForTopic(

+ 5 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java

@@ -41,7 +41,7 @@ public class AccessContext {
 
   Collection<AclAction> aclActions;
 
-  String operationDescription;
+  String operationName;
   Object operationParams;
 
   public static AccessContextBuilder builder() {
@@ -63,7 +63,7 @@ public class AccessContext {
     private Collection<SchemaAction> schemaActions = Collections.emptySet();
     private Collection<KsqlAction> ksqlActions = Collections.emptySet();
     private Collection<AclAction> aclActions = Collections.emptySet();
-    String operationDescription;
+    String operationName;
     Object operationParams;
 
     private AccessContextBuilder() {
@@ -147,8 +147,8 @@ public class AccessContext {
       return this;
     }
 
-    public AccessContextBuilder operationDescription(String description) {
-      this.operationDescription = operationDescription;
+    public AccessContextBuilder auditOperation(String operationName) {
+      this.operationName = operationName;
       return this;
     }
 
@@ -172,7 +172,7 @@ public class AccessContext {
           connector,
           schema, schemaActions,
           ksqlActions, aclActions,
-          operationDescription, operationParams);
+          operationName, operationParams);
     }
   }
 }

+ 96 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java

@@ -0,0 +1,96 @@
+package com.provectus.kafka.ui.service.audit;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.exception.CustomBaseException;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.Resource;
+import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import lombok.SneakyThrows;
+import org.springframework.security.access.AccessDeniedException;
+
+record AuditRecord(String timestamp,
+                   String userPrincipal,  //TODO: discuss - rename to username?
+                   String clusterName,
+                   List<AuditResource> resources,
+                   String operation,
+                   Object operationParams,
+                   OperationResult result) {
+
+  static final JsonMapper MAPPER = new JsonMapper();
+
+  static {
+    MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+  }
+
+  @SneakyThrows
+  String toJson() {
+    return MAPPER.writeValueAsString(this);
+  }
+
+  record AuditResource(PermissibleAction accessType, Resource type, @Nullable Object id) {
+
+    static List<AuditResource> getAccessedResources(AccessContext ctx) {
+      List<AuditResource> resources = new ArrayList<>();
+      ctx.getClusterConfigActions()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.CLUSTERCONFIG, null)));
+      ctx.getTopicActions()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.TOPIC, nameId(ctx.getTopic()))));
+      ctx.getConsumerGroupActions()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
+      ctx.getConnectActions()
+          .forEach(a -> {
+            Map<String, String> resourceId = new LinkedHashMap<>();
+            resourceId.put("connect", ctx.getConnect());
+            if (ctx.getConnector() != null) {
+              resourceId.put("connector", ctx.getConnector());
+            }
+            resources.add(new AuditResource(a, Resource.CONNECT, resourceId));
+          });
+      ctx.getSchemaActions()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
+      ctx.getKsqlActions()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.KSQL, null)));
+      ctx.getAclActions()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.ACL, null)));
+      return resources;
+    }
+
+    @Nullable
+    private static Map<String, Object> nameId(@Nullable String name) {
+      return name != null ? Map.of("name", name) : null;
+    }
+  }
+
+  record OperationResult(boolean success, OperationError error) {
+
+    static OperationResult successful() {
+      return new OperationResult(true, null);
+    }
+
+    static OperationResult error(Throwable th) {
+      OperationError err = OperationError.UNRECOGNIZED_ERROR;
+      if (th instanceof AccessDeniedException) {
+        err = OperationError.ACCESS_DENIED;
+      } else if (th instanceof ValidationException) {
+        err = OperationError.VALIDATION_ERROR;
+      } else if (th instanceof CustomBaseException) {
+        err = OperationError.EXECUTION_ERROR;
+      }
+      return new OperationResult(false, err);
+    }
+
+    enum OperationError {
+      ACCESS_DENIED,
+      VALIDATION_ERROR,
+      EXECUTION_ERROR,
+      UNRECOGNIZED_ERROR
+    }
+  }
+}

+ 47 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service.audit;
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
+import com.provectus.kafka.ui.config.auth.RbacUser;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.AdminClientService;
@@ -14,17 +15,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.security.core.context.SecurityContext;
 import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Signal;
+import reactor.core.scheduler.Schedulers;
 
 @Slf4j
 @Service
 public class AuditService implements Closeable {
 
+  private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
+
   private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
   private static final int DEFAULT_AUDIT_TOPIC_PARTITIONS = 1;
   private static final Map<String, String> DEFAULT_AUDIT_TOPIC_CONFIG = Map.of(
@@ -68,7 +76,7 @@ public class AuditService implements Closeable {
     if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
       producer = MessagesService.createProducer(cluster, AUDIT_PRODUCER_CONFIG);
     }
-    auditWriters.put(cluster.getName(), new AuditWriter(auditTopicName, producer, consoleAudit));
+    auditWriters.put(cluster.getName(), new AuditWriter(cluster.getName(), auditTopicName, producer, consoleAudit));
     log.info("Audit service initialized for cluster '{}'", cluster.getName());
   }
 
@@ -118,18 +126,48 @@ public class AuditService implements Closeable {
     log.error("-----------------------------------------------------------------");
   }
 
-  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
+  public void audit(AccessContext acxt, Signal<?> sig) {
+    if (sig.isOnComplete()) {
+      extractUser(sig)
+          .doOnNext(u -> sendAuditRecord(acxt, u))
+          .subscribe();
+    } else if (sig.isOnError()) {
+      extractUser(sig)
+          .doOnNext(u -> sendAuditRecord(acxt, u, sig.getThrowable()))
+          .subscribe();
+    }
+  }
+
+  private Mono<AuthenticatedUser> extractUser(Signal<?> sig) {
+    //see ReactiveSecurityContextHolder for impl details
+    Object key = SecurityContext.class;
+    if (sig.getContextView().hasKey(key)) {
+      return sig.getContextView().<Mono<SecurityContext>>get(key)
+          .map(context -> context.getAuthentication().getPrincipal())
+          .cast(RbacUser.class)
+          .map(user -> new AuthenticatedUser(user.name(), user.groups()))
+          .switchIfEmpty(NO_AUTH_USER);
+    } else {
+      return NO_AUTH_USER.publishOn(Schedulers.immediate());
+    }
+  }
+
+  private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
     sendAuditRecord(ctx, user, null);
   }
 
-  public void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
-    if (ctx.getCluster() != null) {
-      var writer = auditWriters.get(ctx.getCluster());
-      if (writer != null) {
-        writer.write(ctx, user, th);
+  private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
+    try {
+      if (ctx.getCluster() != null) {
+        var writer = auditWriters.get(ctx.getCluster());
+        if (writer != null) {
+          writer.write(ctx, user, th);
+        }
+      } else {
+        //TODO: discuss app config changes - where to log?
       }
-    } else {
-      //TODO: discuss app config - where to log?
+    } catch (Exception e) {
+      log.warn("Error sending audit record", e);
     }
   }
 

+ 12 - 87
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -1,31 +1,24 @@
 package com.provectus.kafka.ui.service.audit;
 
-import com.fasterxml.jackson.databind.json.JsonMapper;
 import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
-import com.provectus.kafka.ui.exception.CustomBaseException;
-import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
-import com.provectus.kafka.ui.model.rbac.Resource;
-import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
+import com.provectus.kafka.ui.service.audit.AuditRecord.AuditResource;
+import com.provectus.kafka.ui.service.audit.AuditRecord.OperationResult;
 import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import javax.annotation.Nullable;
-import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.security.access.AccessDeniedException;
 
 @Slf4j
-record AuditWriter(String targetTopic,
+record AuditWriter(String clusterName,
+                   String targetTopic,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    boolean logToConsole) implements Closeable {
 
@@ -37,39 +30,15 @@ record AuditWriter(String targetTopic,
         new AuditRecord(
             DateTimeFormatter.ISO_INSTANT.format(Instant.now()),
             user.principal(),
-            ctx.getCluster(),
-            getAccessedResources(ctx),
-            ctx.getOperationDescription(),
-            th == null ? OperationResult.successful() : OperationResult.error(th),
-            ctx.getOperationParams()
+            clusterName,
+            AuditResource.getAccessedResources(ctx),
+            ctx.getOperationName(),
+            ctx.getOperationParams(),
+            th == null ? OperationResult.successful() : OperationResult.error(th)
         )
     );
   }
 
-  private List<AuditResource> getAccessedResources(AccessContext ctx) {
-    List<AuditResource> resources = new ArrayList<>();
-    ctx.getClusterConfigActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.CLUSTERCONFIG, null)));
-    ctx.getTopicActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.TOPIC, nameId(ctx.getTopic()))));
-    ctx.getConsumerGroupActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
-    ctx.getConnectActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.CONNECT, nameId(ctx.getConnect()))));
-    ctx.getSchemaActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
-    ctx.getKsqlActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.KSQL, null)));
-    ctx.getAclActions()
-        .forEach(a -> resources.add(new AuditResource(a, Resource.ACL, null)));
-    return resources;
-  }
-
-  @Nullable
-  private Map<String, Object> nameId(@Nullable String name) {
-    return name != null ? Map.of("name", name) : null;
-  }
-
   private void write(AuditRecord rec) {
     String json = rec.toJson();
     if (logToConsole) {
@@ -79,7 +48,9 @@ record AuditWriter(String targetTopic,
       producer.send(
           new ProducerRecord<>(targetTopic, null, json.getBytes(StandardCharsets.UTF_8)),
           (metadata, ex) -> {
-            log.warn("Error writing Audit record", ex);
+            if (ex != null) {
+              log.warn("Error sending Audit record to kafka for cluster {}", clusterName, ex);
+            }
           });
     }
   }
@@ -89,52 +60,6 @@ record AuditWriter(String targetTopic,
     Optional.ofNullable(producer).ifPresent(KafkaProducer::close);
   }
 
-  record AuditRecord(String timestamp,
-                     String userPrincipal,  //TODO: discuss - rename to username?
-                     String clusterName,
-                     List<AuditResource> resources,
-                     String operation,
-                     OperationResult result,
-                     @Nullable Object params) {
-
-    //TODO: do not render null
-    static final JsonMapper MAPPER = new JsonMapper();
-
-    @SneakyThrows
-    String toJson() {
-      return MAPPER.writeValueAsString(this);
-    }
-  }
-
-  record AuditResource(PermissibleAction accessType, Resource type, @Nullable Object id) {
-  }
-
-  record OperationResult(boolean success, OperationError error) {
-
-    static OperationResult successful() {
-      return new OperationResult(true, null);
-    }
-
-    static OperationResult error(Throwable th) {
-      OperationError err = OperationError.UNRECOGNIZED_ERROR;
-      if (th instanceof AccessDeniedException) {
-        err = OperationError.ACCESS_DENIED;
-      } else if (th instanceof ValidationException) {
-        err = OperationError.VALIDATION_ERROR;
-      } else if (th instanceof CustomBaseException) {
-        err = OperationError.EXECUTION_ERROR;
-      }
-      return new OperationResult(false, err);
-    }
-
-    enum OperationError {
-      ACCESS_DENIED,
-      VALIDATION_ERROR,
-      EXECUTION_ERROR,
-      UNRECOGNIZED_ERROR
-    }
-  }
-
 }
 
 

+ 0 - 56
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java

@@ -17,7 +17,6 @@ import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
-import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.extractor.CognitoAuthorityExtractor;
 import com.provectus.kafka.ui.service.rbac.extractor.GithubAuthorityExtractor;
 import com.provectus.kafka.ui.service.rbac.extractor.GoogleAuthorityExtractor;
@@ -54,8 +53,6 @@ import reactor.util.function.Tuples;
 @Slf4j
 public class AccessControlService {
 
-  private final AuditService auditService;
-
   @Nullable
   private final InMemoryReactiveClientRegistrationRepository clientRegistrationRepository;
   private final RoleBasedAccessControlProperties properties;
@@ -97,59 +94,6 @@ public class AccessControlService {
     }
   }
 
-  @RequiredArgsConstructor
-  public class OperationContext {
-
-    private final AccessContext accessContext;
-
-    public <T> Mono<T> then(Mono<T> mono) {
-      return validateAccessAndReturnUser(accessContext)
-          .flatMap(t -> {
-            if (t.getT2()) {
-              return mono
-                  .doOnSuccess(r -> auditService.sendAuditRecord(accessContext, t.getT1()))
-                  .doOnError(th -> true, th -> auditService.sendAuditRecord(accessContext, t.getT1(), th));
-            } else {
-              var ex = new AccessDeniedException("Access denied");
-              auditService.sendAuditRecord(accessContext, t.getT1(), ex);
-              return Mono.error(ex);
-            }
-          });
-    }
-
-    public <T> Mono<T> thenReturn(T objToReturn) {
-      return then(Mono.justOrEmpty(objToReturn));
-    }
-  }
-
-  public OperationContext withAccess(AccessContext context) {
-    return new OperationContext(context);
-  }
-
-  // [user, access granted flag]
-  private Mono<Tuple2<AuthenticatedUser, Boolean>> validateAccessAndReturnUser(AccessContext context) {
-    if (!rbacEnabled) {
-      return Mono.just(Tuples.of(new AuthenticatedUser("Unknown", Set.of()), true));
-    }
-
-    return getUser()
-        .map(user -> {
-          boolean accessGranted =
-              isApplicationConfigAccessible(context, user)
-                  && isClusterAccessible(context, user)
-                  && isClusterConfigAccessible(context, user)
-                  && isTopicAccessible(context, user)
-                  && isConsumerGroupAccessible(context, user)
-                  && isConnectAccessible(context, user)
-                  && isConnectorAccessible(context, user) // TODO connector selectors
-                  && isSchemaAccessible(context, user)
-                  && isKsqlAccessible(context, user)
-                  && isAclAccessible(context, user);
-
-          return Tuples.of(user, accessGranted);
-        });
-  }
-
   public Mono<Void> validateAccess(AccessContext context) {
     if (!rbacEnabled) {
       return Mono.empty();

+ 3 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java

@@ -9,6 +9,7 @@ import static org.mockito.Mockito.when;
 import com.provectus.kafka.ui.controller.SchemasController;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.SchemaSubjectDTO;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.sr.model.Compatibility;
 import com.provectus.kafka.ui.sr.model.SchemaSubject;
 import com.provectus.kafka.ui.util.AccessControlServiceMock;
@@ -41,7 +42,8 @@ public class SchemaRegistryPaginationTest {
                 new SchemaRegistryService.SubjectWithCompatibilityLevel(
                     new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
 
-    this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock());
+    this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(),
+        mock(AuditService.class));
     this.controller.setClustersStorage(clustersStorage);
   }
 

+ 2 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -18,6 +18,7 @@ import com.provectus.kafka.ui.model.SortOrderDTO;
 import com.provectus.kafka.ui.model.TopicColumnsToSortDTO;
 import com.provectus.kafka.ui.model.TopicDTO;
 import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
+import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import com.provectus.kafka.ui.util.AccessControlServiceMock;
 import java.util.ArrayList;
@@ -46,7 +47,7 @@ class TopicsServicePaginationTest {
   private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
 
   private final TopicsController topicsController = new TopicsController(
-      topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService);
+      topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
 
   private void init(Map<String, InternalTopic> topicsInCache) {