Explorar o código

Merge branch 'master' into No-data-polling-issue

Ilya Kuramshin hai 1 ano
pai
achega
1e9e7a0b11
Modificáronse 33 ficheiros con 549 adicións e 200 borrados
  1. 6 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java
  2. 26 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AbstractController.java
  3. 16 20
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java
  4. 9 13
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java
  5. 12 17
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java
  6. 6 10
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java
  7. 10 14
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java
  8. 21 25
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java
  9. 7 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java
  10. 7 11
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java
  11. 21 25
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java
  12. 25 29
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java
  13. 11 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java
  14. 7 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ApplicationConfigAction.java
  15. 11 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AuditAction.java
  16. 7 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClusterConfigAction.java
  17. 7 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ConnectAction.java
  18. 7 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ConsumerGroupAction.java
  19. 10 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/KsqlAction.java
  20. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java
  21. 7 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/SchemaAction.java
  22. 7 1
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/TopicAction.java
  23. 2 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java
  24. 59 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java
  25. 14 9
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java
  26. 7 4
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java
  27. 5 0
      kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java
  28. 108 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java
  29. 3 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java
  30. 4 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java
  31. 13 2
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java
  32. 86 0
      kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java
  33. 3 0
      kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

+ 6 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -150,7 +150,13 @@ public class ClustersProperties {
     Integer auditTopicsPartitions;
     Integer auditTopicsPartitions;
     Boolean topicAuditEnabled;
     Boolean topicAuditEnabled;
     Boolean consoleAuditEnabled;
     Boolean consoleAuditEnabled;
+    LogLevel level;
     Map<String, String> auditTopicProperties;
     Map<String, String> auditTopicProperties;
+
+    public enum LogLevel {
+      ALL,
+      ALTER_ONLY //default
+    }
   }
   }
 
 
   @PostConstruct
   @PostConstruct

+ 26 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AbstractController.java

@@ -2,12 +2,19 @@ package com.provectus.kafka.ui.controller;
 
 
 import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.exception.ClusterNotFoundException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.ClustersStorage;
 import com.provectus.kafka.ui.service.ClustersStorage;
+import com.provectus.kafka.ui.service.audit.AuditService;
+import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Signal;
 
 
 public abstract class AbstractController {
 public abstract class AbstractController {
 
 
-  private ClustersStorage clustersStorage;
+  protected ClustersStorage clustersStorage;
+  protected AccessControlService accessControlService;
+  protected AuditService auditService;
 
 
   protected KafkaCluster getCluster(String name) {
   protected KafkaCluster getCluster(String name) {
     return clustersStorage.getClusterByName(name)
     return clustersStorage.getClusterByName(name)
@@ -15,8 +22,26 @@ public abstract class AbstractController {
             String.format("Cluster with name '%s' not found", name)));
             String.format("Cluster with name '%s' not found", name)));
   }
   }
 
 
+  protected Mono<Void> validateAccess(AccessContext context) {
+    return accessControlService.validateAccess(context);
+  }
+
+  protected void audit(AccessContext acxt, Signal<?> sig) {
+    auditService.audit(acxt, sig);
+  }
+
   @Autowired
   @Autowired
   public void setClustersStorage(ClustersStorage clustersStorage) {
   public void setClustersStorage(ClustersStorage clustersStorage) {
     this.clustersStorage = clustersStorage;
     this.clustersStorage = clustersStorage;
   }
   }
+
+  @Autowired
+  public void setAccessControlService(AccessControlService accessControlService) {
+    this.accessControlService = accessControlService;
+  }
+
+  @Autowired
+  public void setAuditService(AuditService auditService) {
+    this.auditService = auditService;
+  }
 }
 }

+ 16 - 20
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/AclsController.java

@@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.AclAction;
 import com.provectus.kafka.ui.model.rbac.permission.AclAction;
 import com.provectus.kafka.ui.service.acl.AclsService;
 import com.provectus.kafka.ui.service.acl.AclsService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Optional;
 import java.util.Optional;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.PatternType;
@@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
 public class AclsController extends AbstractController implements AclsApi {
 public class AclsController extends AbstractController implements AclsApi {
 
 
   private final AclsService aclsService;
   private final AclsService aclsService;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
   public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
@@ -41,11 +37,11 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("createAcl")
         .operationName("createAcl")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(kafkaAclDto)
         .then(kafkaAclDto)
         .map(ClusterMapper::toAclBinding)
         .map(ClusterMapper::toAclBinding)
         .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
         .flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -58,11 +54,11 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("deleteAcl")
         .operationName("deleteAcl")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(kafkaAclDto)
         .then(kafkaAclDto)
         .map(ClusterMapper::toAclBinding)
         .map(ClusterMapper::toAclBinding)
         .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
         .flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -88,12 +84,12 @@ public class AclsController extends AbstractController implements AclsApi {
 
 
     var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
     var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         Mono.just(
         Mono.just(
             ResponseEntity.ok(
             ResponseEntity.ok(
                 aclsService.listAcls(getCluster(clusterName), filter)
                 aclsService.listAcls(getCluster(clusterName), filter)
                     .map(ClusterMapper::toKafkaAclDto)))
                     .map(ClusterMapper::toKafkaAclDto)))
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -104,11 +100,11 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("getAclAsCsv")
         .operationName("getAclAsCsv")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         aclsService.getAclAsCsvString(getCluster(clusterName))
         aclsService.getAclAsCsvString(getCluster(clusterName))
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
             .flatMap(Mono::just)
             .flatMap(Mono::just)
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
     );
     );
   }
   }
 
 
@@ -120,10 +116,10 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("syncAclsCsv")
         .operationName("syncAclsCsv")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(csvMono)
         .then(csvMono)
         .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
         .flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -137,10 +133,10 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("createConsumerAcl")
         .operationName("createConsumerAcl")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(createConsumerAclDto)
         .then(createConsumerAclDto)
         .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
         .flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -154,10 +150,10 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("createProducerAcl")
         .operationName("createProducerAcl")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(createProducerAclDto)
         .then(createProducerAclDto)
         .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
         .flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -171,10 +167,10 @@ public class AclsController extends AbstractController implements AclsApi {
         .operationName("createStreamAppAcl")
         .operationName("createStreamAppAcl")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(createStreamAppAclDto)
         .then(createStreamAppAclDto)
         .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
         .flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 }
 }

+ 9 - 13
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ApplicationConfigController.java

@@ -15,8 +15,6 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.ApplicationInfoService;
 import com.provectus.kafka.ui.service.ApplicationInfoService;
 import com.provectus.kafka.ui.service.KafkaClusterFactory;
 import com.provectus.kafka.ui.service.KafkaClusterFactory;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import com.provectus.kafka.ui.util.ApplicationRestarter;
 import com.provectus.kafka.ui.util.ApplicationRestarter;
 import com.provectus.kafka.ui.util.DynamicConfigOperations;
 import com.provectus.kafka.ui.util.DynamicConfigOperations;
 import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
 import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
@@ -39,7 +37,7 @@ import reactor.util.function.Tuples;
 @Slf4j
 @Slf4j
 @RestController
 @RestController
 @RequiredArgsConstructor
 @RequiredArgsConstructor
-public class ApplicationConfigController implements ApplicationConfigApi {
+public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {
 
 
   private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
   private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
 
 
@@ -51,12 +49,10 @@ public class ApplicationConfigController implements ApplicationConfigApi {
     ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
     ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
   }
   }
 
 
-  private final AccessControlService accessControlService;
   private final DynamicConfigOperations dynamicConfigOperations;
   private final DynamicConfigOperations dynamicConfigOperations;
   private final ApplicationRestarter restarter;
   private final ApplicationRestarter restarter;
   private final KafkaClusterFactory kafkaClusterFactory;
   private final KafkaClusterFactory kafkaClusterFactory;
   private final ApplicationInfoService applicationInfoService;
   private final ApplicationInfoService applicationInfoService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
   public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
@@ -69,12 +65,12 @@ public class ApplicationConfigController implements ApplicationConfigApi {
         .applicationConfigActions(VIEW)
         .applicationConfigActions(VIEW)
         .operationName("getCurrentConfig")
         .operationName("getCurrentConfig")
         .build();
         .build();
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(Mono.fromSupplier(() -> ResponseEntity.ok(
         .then(Mono.fromSupplier(() -> ResponseEntity.ok(
             new ApplicationConfigDTO()
             new ApplicationConfigDTO()
                 .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
                 .properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
         )))
         )))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -84,14 +80,14 @@ public class ApplicationConfigController implements ApplicationConfigApi {
         .applicationConfigActions(EDIT)
         .applicationConfigActions(EDIT)
         .operationName("restartWithConfig")
         .operationName("restartWithConfig")
         .build();
         .build();
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(restartRequestDto)
         .then(restartRequestDto)
         .<ResponseEntity<Void>>map(dto -> {
         .<ResponseEntity<Void>>map(dto -> {
           dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
           dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
           restarter.requestRestart();
           restarter.requestRestart();
           return ResponseEntity.ok().build();
           return ResponseEntity.ok().build();
         })
         })
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -101,13 +97,13 @@ public class ApplicationConfigController implements ApplicationConfigApi {
         .applicationConfigActions(EDIT)
         .applicationConfigActions(EDIT)
         .operationName("uploadConfigRelatedFile")
         .operationName("uploadConfigRelatedFile")
         .build();
         .build();
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(fileFlux.single())
         .then(fileFlux.single())
         .flatMap(file ->
         .flatMap(file ->
             dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
             dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
                 .map(path -> new UploadedFileInfoDTO().location(path.toString()))
                 .map(path -> new UploadedFileInfoDTO().location(path.toString()))
                 .map(ResponseEntity::ok))
                 .map(ResponseEntity::ok))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -117,7 +113,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
         .applicationConfigActions(EDIT)
         .applicationConfigActions(EDIT)
         .operationName("validateConfig")
         .operationName("validateConfig")
         .build();
         .build();
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(configDto)
         .then(configDto)
         .flatMap(config -> {
         .flatMap(config -> {
           PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
           PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
@@ -126,7 +122,7 @@ public class ApplicationConfigController implements ApplicationConfigApi {
               .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
               .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
         })
         })
         .map(ResponseEntity::ok)
         .map(ResponseEntity::ok)
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
   private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(

+ 12 - 17
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -11,8 +11,6 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
 import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
 import com.provectus.kafka.ui.service.BrokerService;
 import com.provectus.kafka.ui.service.BrokerService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
@@ -31,9 +29,6 @@ public class BrokersController extends AbstractController implements BrokersApi
   private final BrokerService brokerService;
   private final BrokerService brokerService;
   private final ClusterMapper clusterMapper;
   private final ClusterMapper clusterMapper;
 
 
-  private final AuditService auditService;
-  private final AccessControlService accessControlService;
-
   @Override
   @Override
   public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
   public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
                                                           ServerWebExchange exchange) {
                                                           ServerWebExchange exchange) {
@@ -43,9 +38,9 @@ public class BrokersController extends AbstractController implements BrokersApi
         .build();
         .build();
 
 
     var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
     var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(ResponseEntity.ok(job))
         .thenReturn(ResponseEntity.ok(job))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -57,14 +52,14 @@ public class BrokersController extends AbstractController implements BrokersApi
         .operationParams(Map.of("id", id))
         .operationParams(Map.of("id", id))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(
         .then(
             brokerService.getBrokerMetrics(getCluster(clusterName), id)
             brokerService.getBrokerMetrics(getCluster(clusterName), id)
                 .map(clusterMapper::toBrokerMetrics)
                 .map(clusterMapper::toBrokerMetrics)
                 .map(ResponseEntity::ok)
                 .map(ResponseEntity::ok)
                 .onErrorReturn(ResponseEntity.notFound().build())
                 .onErrorReturn(ResponseEntity.notFound().build())
         )
         )
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -80,10 +75,10 @@ public class BrokersController extends AbstractController implements BrokersApi
         .operationParams(Map.of("brokerIds", brokerIds))
         .operationParams(Map.of("brokerIds", brokerIds))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(ResponseEntity.ok(
         .thenReturn(ResponseEntity.ok(
             brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
             brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -97,11 +92,11 @@ public class BrokersController extends AbstractController implements BrokersApi
         .operationParams(Map.of("brokerId", id))
         .operationParams(Map.of("brokerId", id))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).thenReturn(
+    return validateAccess(context).thenReturn(
         ResponseEntity.ok(
         ResponseEntity.ok(
             brokerService.getBrokerConfig(getCluster(clusterName), id)
             brokerService.getBrokerConfig(getCluster(clusterName), id)
                 .map(clusterMapper::toBrokerConfig))
                 .map(clusterMapper::toBrokerConfig))
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -116,11 +111,11 @@ public class BrokersController extends AbstractController implements BrokersApi
         .operationParams(Map.of("brokerId", id))
         .operationParams(Map.of("brokerId", id))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         brokerLogdir
         brokerLogdir
             .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
             .flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -136,11 +131,11 @@ public class BrokersController extends AbstractController implements BrokersApi
         .operationParams(Map.of("brokerId", id))
         .operationParams(Map.of("brokerId", id))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         brokerConfig
         brokerConfig
             .flatMap(bci -> brokerService.updateBrokerConfigByName(
             .flatMap(bci -> brokerService.updateBrokerConfigByName(
                 getCluster(clusterName), id, name, bci.getValue()))
                 getCluster(clusterName), id, name, bci.getValue()))
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 }
 }

+ 6 - 10
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ClustersController.java

@@ -6,8 +6,6 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
 import com.provectus.kafka.ui.model.ClusterStatsDTO;
 import com.provectus.kafka.ui.model.ClusterStatsDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.ClusterService;
 import com.provectus.kafka.ui.service.ClusterService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import lombok.RequiredArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.ResponseEntity;
@@ -21,8 +19,6 @@ import reactor.core.publisher.Mono;
 @Slf4j
 @Slf4j
 public class ClustersController extends AbstractController implements ClustersApi {
 public class ClustersController extends AbstractController implements ClustersApi {
   private final ClusterService clusterService;
   private final ClusterService clusterService;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
   public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
@@ -40,13 +36,13 @@ public class ClustersController extends AbstractController implements ClustersAp
         .operationName("getClusterMetrics")
         .operationName("getClusterMetrics")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(
         .then(
             clusterService.getClusterMetrics(getCluster(clusterName))
             clusterService.getClusterMetrics(getCluster(clusterName))
                 .map(ResponseEntity::ok)
                 .map(ResponseEntity::ok)
                 .onErrorReturn(ResponseEntity.notFound().build())
                 .onErrorReturn(ResponseEntity.notFound().build())
         )
         )
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -57,13 +53,13 @@ public class ClustersController extends AbstractController implements ClustersAp
         .operationName("getClusterStats")
         .operationName("getClusterStats")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(
         .then(
             clusterService.getClusterStats(getCluster(clusterName))
             clusterService.getClusterStats(getCluster(clusterName))
                 .map(ResponseEntity::ok)
                 .map(ResponseEntity::ok)
                 .onErrorReturn(ResponseEntity.notFound().build())
                 .onErrorReturn(ResponseEntity.notFound().build())
         )
         )
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -75,8 +71,8 @@ public class ClustersController extends AbstractController implements ClustersAp
         .operationName("updateClusterInfo")
         .operationName("updateClusterInfo")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
         .then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 }
 }

+ 10 - 14
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/ConsumerGroupsController.java

@@ -19,8 +19,6 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.ConsumerGroupService;
 import com.provectus.kafka.ui.service.OffsetsResetService;
 import com.provectus.kafka.ui.service.OffsetsResetService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
@@ -42,8 +40,6 @@ public class ConsumerGroupsController extends AbstractController implements Cons
 
 
   private final ConsumerGroupService consumerGroupService;
   private final ConsumerGroupService consumerGroupService;
   private final OffsetsResetService offsetsResetService;
   private final OffsetsResetService offsetsResetService;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Value("${consumer.groups.page.size:25}")
   @Value("${consumer.groups.page.size:25}")
   private int defaultConsumerGroupsPageSize;
   private int defaultConsumerGroupsPageSize;
@@ -59,9 +55,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
         .operationName("deleteConsumerGroup")
         .operationName("deleteConsumerGroup")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
         .then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -76,11 +72,11 @@ public class ConsumerGroupsController extends AbstractController implements Cons
         .operationName("getConsumerGroup")
         .operationName("getConsumerGroup")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
         .then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
             .map(ConsumerGroupMapper::toDetailsDto)
             .map(ConsumerGroupMapper::toDetailsDto)
             .map(ResponseEntity::ok))
             .map(ResponseEntity::ok))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -104,9 +100,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
             .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
             .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(job)
         .then(job)
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -125,7 +121,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
         .operationName("getConsumerGroupsPage")
         .operationName("getConsumerGroupsPage")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         consumerGroupService.getConsumerGroupsPage(
         consumerGroupService.getConsumerGroupsPage(
                 getCluster(clusterName),
                 getCluster(clusterName),
                 Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
                 Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
@@ -136,7 +132,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
             )
             )
             .map(this::convertPage)
             .map(this::convertPage)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -191,9 +187,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
         }
         }
       };
       };
 
 
-      return accessControlService.validateAccess(context)
+      return validateAccess(context)
           .then(mono.get())
           .then(mono.get())
-          .doOnEach(sig -> auditService.audit(context, sig));
+          .doOnEach(sig -> audit(context, sig));
     }).thenReturn(ResponseEntity.ok().build());
     }).thenReturn(ResponseEntity.ok().build());
   }
   }
 
 

+ 21 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KafkaConnectController.java

@@ -18,8 +18,6 @@ import com.provectus.kafka.ui.model.TaskDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.service.KafkaConnectService;
 import com.provectus.kafka.ui.service.KafkaConnectService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
@@ -40,8 +38,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
       = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
       = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
 
 
   private final KafkaConnectService kafkaConnectService;
   private final KafkaConnectService kafkaConnectService;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
   public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
@@ -64,9 +60,9 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationName("getConnectors")
         .operationName("getConnectors")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
         .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -81,10 +77,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationName("createConnector")
         .operationName("createConnector")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
         kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -100,10 +96,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationName("getConnector")
         .operationName("getConnector")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
         kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -119,10 +115,10 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationParams(Map.of("connectorName", connectName))
         .operationParams(Map.of("connectorName", connectName))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
         kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
 
 
@@ -150,7 +146,7 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .sort(comparator);
         .sort(comparator);
 
 
     return Mono.just(ResponseEntity.ok(job))
     return Mono.just(ResponseEntity.ok(job))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -166,11 +162,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationName("getConnectorConfig")
         .operationName("getConnectorConfig")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         kafkaConnectService
         kafkaConnectService
             .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
             .getConnectorConfig(getCluster(clusterName), connectName, connectorName)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -187,11 +183,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationParams(Map.of("connectorName", connectorName))
         .operationParams(Map.of("connectorName", connectorName))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
             kafkaConnectService
             kafkaConnectService
                 .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
                 .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
                 .map(ResponseEntity::ok))
                 .map(ResponseEntity::ok))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -214,11 +210,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationParams(Map.of("connectorName", connectorName))
         .operationParams(Map.of("connectorName", connectorName))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         kafkaConnectService
         kafkaConnectService
             .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
             .updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -234,11 +230,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationParams(Map.of("connectorName", connectorName))
         .operationParams(Map.of("connectorName", connectorName))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).thenReturn(
+    return validateAccess(context).thenReturn(
         ResponseEntity
         ResponseEntity
             .ok(kafkaConnectService
             .ok(kafkaConnectService
                 .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
                 .getConnectorTasks(getCluster(clusterName), connectName, connectorName))
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -254,11 +250,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationParams(Map.of("connectorName", connectorName))
         .operationParams(Map.of("connectorName", connectorName))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         kafkaConnectService
         kafkaConnectService
             .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
             .restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -272,11 +268,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
         .operationName("getConnectorPlugins")
         .operationName("getConnectorPlugins")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         Mono.just(
         Mono.just(
             ResponseEntity.ok(
             ResponseEntity.ok(
                 kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
                 kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override

+ 7 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java

@@ -9,9 +9,7 @@ import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
 import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
 import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
 import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
-import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
 import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
@@ -29,8 +27,6 @@ import reactor.core.publisher.Mono;
 public class KsqlController extends AbstractController implements KsqlApi {
 public class KsqlController extends AbstractController implements KsqlApi {
 
 
   private final KsqlServiceV2 ksqlServiceV2;
   private final KsqlServiceV2 ksqlServiceV2;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
   public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
@@ -44,13 +40,13 @@ public class KsqlController extends AbstractController implements KsqlApi {
                   .operationName("executeKsql")
                   .operationName("executeKsql")
                   .operationParams(command)
                   .operationParams(command)
                   .build();
                   .build();
-              return accessControlService.validateAccess(context).thenReturn(
+              return validateAccess(context).thenReturn(
                       new KsqlCommandV2ResponseDTO().pipeId(
                       new KsqlCommandV2ResponseDTO().pipeId(
                           ksqlServiceV2.registerCommand(
                           ksqlServiceV2.registerCommand(
                               getCluster(clusterName),
                               getCluster(clusterName),
                               command.getKsql(),
                               command.getKsql(),
                               Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
                               Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
-                  .doOnEach(sig -> auditService.audit(context, sig));
+                  .doOnEach(sig -> audit(context, sig));
             }
             }
         )
         )
         .map(ResponseEntity::ok);
         .map(ResponseEntity::ok);
@@ -66,7 +62,7 @@ public class KsqlController extends AbstractController implements KsqlApi {
         .operationName("openKsqlResponsePipe")
         .operationName("openKsqlResponsePipe")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).thenReturn(
+    return validateAccess(context).thenReturn(
         ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
         ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
             .map(table -> new KsqlResponseDTO()
             .map(table -> new KsqlResponseDTO()
                 .table(
                 .table(
@@ -86,9 +82,9 @@ public class KsqlController extends AbstractController implements KsqlApi {
         .operationName("listStreams")
         .operationName("listStreams")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
         .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -100,8 +96,8 @@ public class KsqlController extends AbstractController implements KsqlApi {
         .operationName("listTables")
         .operationName("listTables")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
         .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 }
 }

+ 7 - 11
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -24,8 +24,6 @@ import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.MessagesService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
@@ -49,8 +47,6 @@ public class MessagesController extends AbstractController implements MessagesAp
 
 
   private final MessagesService messagesService;
   private final MessagesService messagesService;
   private final DeserializationService deserializationService;
   private final DeserializationService deserializationService;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<Void>> deleteTopicMessages(
   public Mono<ResponseEntity<Void>> deleteTopicMessages(
@@ -63,13 +59,13 @@ public class MessagesController extends AbstractController implements MessagesAp
         .topicActions(MESSAGES_DELETE)
         .topicActions(MESSAGES_DELETE)
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).<ResponseEntity<Void>>then(
+    return validateAccess(context).<ResponseEntity<Void>>then(
         messagesService.deleteTopicMessages(
         messagesService.deleteTopicMessages(
             getCluster(clusterName),
             getCluster(clusterName),
             topicName,
             topicName,
             Optional.ofNullable(partitions).orElse(List.of())
             Optional.ofNullable(partitions).orElse(List.of())
         ).thenReturn(ResponseEntity.ok().build())
         ).thenReturn(ResponseEntity.ok().build())
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -120,9 +116,9 @@ public class MessagesController extends AbstractController implements MessagesAp
     );
     );
 
 
     var context = contextBuilder.build();
     var context = contextBuilder.build();
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(job)
         .then(job)
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -137,11 +133,11 @@ public class MessagesController extends AbstractController implements MessagesAp
         .operationName("sendTopicMessages")
         .operationName("sendTopicMessages")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         createTopicMessage.flatMap(msg ->
         createTopicMessage.flatMap(msg ->
             messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
             messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
         ).map(ResponseEntity::ok)
         ).map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   /**
   /**
@@ -192,7 +188,7 @@ public class MessagesController extends AbstractController implements MessagesAp
             ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
             ? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
             : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
             : deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         Mono.just(dto)
         Mono.just(dto)
             .subscribeOn(Schedulers.boundedElastic())
             .subscribeOn(Schedulers.boundedElastic())
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)

+ 21 - 25
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/SchemasController.java

@@ -13,8 +13,6 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
 import com.provectus.kafka.ui.service.SchemaRegistryService;
 import com.provectus.kafka.ui.service.SchemaRegistryService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
@@ -38,8 +36,6 @@ public class SchemasController extends AbstractController implements SchemasApi
   private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
   private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
 
 
   private final SchemaRegistryService schemaRegistryService;
   private final SchemaRegistryService schemaRegistryService;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   protected KafkaCluster getCluster(String clusterName) {
   protected KafkaCluster getCluster(String clusterName) {
@@ -61,7 +57,7 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("checkSchemaCompatibility")
         .operationName("checkSchemaCompatibility")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         newSchemaSubjectMono.flatMap(subjectDTO ->
         newSchemaSubjectMono.flatMap(subjectDTO ->
                 schemaRegistryService.checksSchemaCompatibility(
                 schemaRegistryService.checksSchemaCompatibility(
                     getCluster(clusterName),
                     getCluster(clusterName),
@@ -70,7 +66,7 @@ public class SchemasController extends AbstractController implements SchemasApi
                 ))
                 ))
             .map(kafkaSrMapper::toDto)
             .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -83,7 +79,7 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("createNewSchema")
         .operationName("createNewSchema")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         newSchemaSubjectMono.flatMap(newSubject ->
         newSchemaSubjectMono.flatMap(newSubject ->
                 schemaRegistryService.registerNewSchema(
                 schemaRegistryService.registerNewSchema(
                     getCluster(clusterName),
                     getCluster(clusterName),
@@ -92,7 +88,7 @@ public class SchemasController extends AbstractController implements SchemasApi
                 )
                 )
             ).map(kafkaSrMapper::toDto)
             ).map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -105,9 +101,9 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("deleteLatestSchema")
         .operationName("deleteLatestSchema")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
         schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
             .thenReturn(ResponseEntity.ok().build())
     );
     );
   }
   }
@@ -122,9 +118,9 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("deleteSchema")
         .operationName("deleteSchema")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
         schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
             .thenReturn(ResponseEntity.ok().build())
     );
     );
   }
   }
@@ -139,9 +135,9 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("deleteSchemaByVersion")
         .operationName("deleteSchemaByVersion")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
         schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
             .thenReturn(ResponseEntity.ok().build())
     );
     );
   }
   }
@@ -160,9 +156,9 @@ public class SchemasController extends AbstractController implements SchemasApi
         schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
         schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
             .map(kafkaSrMapper::toDto);
             .map(kafkaSrMapper::toDto);
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(ResponseEntity.ok(schemas))
         .thenReturn(ResponseEntity.ok(schemas))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -185,11 +181,11 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("getLatestSchema")
         .operationName("getLatestSchema")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
         schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
             .map(kafkaSrMapper::toDto)
             .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -203,12 +199,12 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationParams(Map.of("subject", subject, "version", version))
         .operationParams(Map.of("subject", subject, "version", version))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         schemaRegistryService.getSchemaSubjectByVersion(
         schemaRegistryService.getSchemaSubjectByVersion(
                 getCluster(clusterName), subject, version)
                 getCluster(clusterName), subject, version)
             .map(kafkaSrMapper::toDto)
             .map(kafkaSrMapper::toDto)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -244,7 +240,7 @@ public class SchemasController extends AbstractController implements SchemasApi
               .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
               .map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
               .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
               .map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
         }).map(ResponseEntity::ok)
         }).map(ResponseEntity::ok)
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -257,14 +253,14 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationName("updateGlobalSchemaCompatibilityLevel")
         .operationName("updateGlobalSchemaCompatibilityLevel")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         compatibilityLevelMono
         compatibilityLevelMono
             .flatMap(compatibilityLevelDTO ->
             .flatMap(compatibilityLevelDTO ->
                 schemaRegistryService.updateGlobalSchemaCompatibility(
                 schemaRegistryService.updateGlobalSchemaCompatibility(
                     getCluster(clusterName),
                     getCluster(clusterName),
                     kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
                     kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
                 ))
                 ))
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
             .thenReturn(ResponseEntity.ok().build())
     );
     );
   }
   }
@@ -280,7 +276,7 @@ public class SchemasController extends AbstractController implements SchemasApi
         .operationParams(Map.of("subject", subject))
         .operationParams(Map.of("subject", subject))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         compatibilityLevelMono
         compatibilityLevelMono
             .flatMap(compatibilityLevelDTO ->
             .flatMap(compatibilityLevelDTO ->
                 schemaRegistryService.updateSchemaCompatibility(
                 schemaRegistryService.updateSchemaCompatibility(
@@ -288,7 +284,7 @@ public class SchemasController extends AbstractController implements SchemasApi
                     subject,
                     subject,
                     kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
                     kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
                 ))
                 ))
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
             .thenReturn(ResponseEntity.ok().build())
     );
     );
   }
   }

+ 25 - 29
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -27,8 +27,6 @@ import com.provectus.kafka.ui.model.TopicsResponseDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.TopicsService;
 import com.provectus.kafka.ui.service.TopicsService;
 import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
 import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
-import com.provectus.kafka.ui.service.audit.AuditService;
-import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -53,8 +51,6 @@ public class TopicsController extends AbstractController implements TopicsApi {
   private final TopicsService topicsService;
   private final TopicsService topicsService;
   private final TopicAnalysisService topicAnalysisService;
   private final TopicAnalysisService topicAnalysisService;
   private final ClusterMapper clusterMapper;
   private final ClusterMapper clusterMapper;
-  private final AccessControlService accessControlService;
-  private final AuditService auditService;
 
 
   @Override
   @Override
   public Mono<ResponseEntity<TopicDTO>> createTopic(
   public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -67,12 +63,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
           .operationParams(topicCreation)
           .operationParams(topicCreation)
           .build();
           .build();
 
 
-      return accessControlService.validateAccess(context)
+      return validateAccess(context)
           .then(topicsService.createTopic(getCluster(clusterName), topicCreation))
           .then(topicsService.createTopic(getCluster(clusterName), topicCreation))
           .map(clusterMapper::toTopic)
           .map(clusterMapper::toTopic)
           .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
           .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
           .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
           .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
-          .doOnEach(sig -> auditService.audit(context, sig));
+          .doOnEach(sig -> audit(context, sig));
     });
     });
   }
   }
 
 
@@ -86,11 +82,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("recreateTopic")
         .operationName("recreateTopic")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         topicsService.recreateTopic(getCluster(clusterName), topicName)
         topicsService.recreateTopic(getCluster(clusterName), topicName)
             .map(clusterMapper::toTopic)
             .map(clusterMapper::toTopic)
             .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
             .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -105,11 +101,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationParams(Map.of("newTopicName", newTopicName))
         .operationParams(Map.of("newTopicName", newTopicName))
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
         .then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
             .map(clusterMapper::toTopic)
             .map(clusterMapper::toTopic)
             .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
             .map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
-        ).doOnEach(sig -> auditService.audit(context, sig));
+        ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -123,11 +119,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("deleteTopic")
         .operationName("deleteTopic")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(
         .then(
             topicsService.deleteTopic(getCluster(clusterName), topicName)
             topicsService.deleteTopic(getCluster(clusterName), topicName)
                 .thenReturn(ResponseEntity.ok().<Void>build())
                 .thenReturn(ResponseEntity.ok().<Void>build())
-        ).doOnEach(sig -> auditService.audit(context, sig));
+        ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
 
 
@@ -142,7 +138,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("getTopicConfigs")
         .operationName("getTopicConfigs")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         topicsService.getTopicConfigs(getCluster(clusterName), topicName)
         topicsService.getTopicConfigs(getCluster(clusterName), topicName)
             .map(lst -> lst.stream()
             .map(lst -> lst.stream()
                 .map(InternalTopicConfig::from)
                 .map(InternalTopicConfig::from)
@@ -150,7 +146,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
                 .collect(toList()))
                 .collect(toList()))
             .map(Flux::fromIterable)
             .map(Flux::fromIterable)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -164,11 +160,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("getTopicDetails")
         .operationName("getTopicDetails")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         topicsService.getTopicDetails(getCluster(clusterName), topicName)
         topicsService.getTopicDetails(getCluster(clusterName), topicName)
             .map(clusterMapper::toTopicDetails)
             .map(clusterMapper::toTopicDetails)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -215,7 +211,7 @@ public class TopicsController extends AbstractController implements TopicsApi {
                       .pageCount(totalPages));
                       .pageCount(totalPages));
         })
         })
         .map(ResponseEntity::ok)
         .map(ResponseEntity::ok)
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -230,12 +226,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("updateTopic")
         .operationName("updateTopic")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         topicsService
         topicsService
             .updateTopic(getCluster(clusterName), topicName, topicUpdate)
             .updateTopic(getCluster(clusterName), topicName, topicUpdate)
             .map(clusterMapper::toTopic)
             .map(clusterMapper::toTopic)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -250,11 +246,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .topicActions(VIEW, EDIT)
         .topicActions(VIEW, EDIT)
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         partitionsIncrease.flatMap(partitions ->
         partitionsIncrease.flatMap(partitions ->
             topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
             topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
         ).map(ResponseEntity::ok)
         ).map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -270,12 +266,12 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("changeReplicationFactor")
         .operationName("changeReplicationFactor")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         replicationFactorChange
         replicationFactorChange
             .flatMap(rfc ->
             .flatMap(rfc ->
                 topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
                 topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
-    ).doOnEach(sig -> auditService.audit(context, sig));
+    ).doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   @Override
   @Override
@@ -288,9 +284,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("analyzeTopic")
         .operationName("analyzeTopic")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context).then(
+    return validateAccess(context).then(
         topicAnalysisService.analyze(getCluster(clusterName), topicName)
         topicAnalysisService.analyze(getCluster(clusterName), topicName)
-            .doOnEach(sig -> auditService.audit(context, sig))
+            .doOnEach(sig -> audit(context, sig))
             .thenReturn(ResponseEntity.ok().build())
             .thenReturn(ResponseEntity.ok().build())
     );
     );
   }
   }
@@ -305,9 +301,9 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("cancelTopicAnalysis")
         .operationName("cancelTopicAnalysis")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
         .then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
-        .doOnEach(sig -> auditService.audit(context, sig))
+        .doOnEach(sig -> audit(context, sig))
         .thenReturn(ResponseEntity.ok().build());
         .thenReturn(ResponseEntity.ok().build());
   }
   }
 
 
@@ -324,11 +320,11 @@ public class TopicsController extends AbstractController implements TopicsApi {
         .operationName("getTopicAnalysis")
         .operationName("getTopicAnalysis")
         .build();
         .build();
 
 
-    return accessControlService.validateAccess(context)
+    return validateAccess(context)
         .thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
         .thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
             .map(ResponseEntity::ok)
             .map(ResponseEntity::ok)
             .orElseGet(() -> ResponseEntity.notFound().build()))
             .orElseGet(() -> ResponseEntity.notFound().build()))
-        .doOnEach(sig -> auditService.audit(context, sig));
+        .doOnEach(sig -> audit(context, sig));
   }
   }
 
 
   private Comparator<InternalTopic> getComparatorForTopic(
   private Comparator<InternalTopic> getComparatorForTopic(

+ 11 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AclAction.java

@@ -1,15 +1,25 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
 public enum AclAction implements PermissibleAction {
 public enum AclAction implements PermissibleAction {
 
 
   VIEW,
   VIEW,
-  EDIT;
+  EDIT
+
+  ;
+
+  public static final Set<AclAction> ALTER_ACTIONS = Set.of(EDIT);
 
 
   @Nullable
   @Nullable
   public static AclAction fromString(String name) {
   public static AclAction fromString(String name) {
     return EnumUtils.getEnum(AclAction.class, name);
     return EnumUtils.getEnum(AclAction.class, name);
   }
   }
+
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ApplicationConfigAction.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -10,9 +11,15 @@ public enum ApplicationConfigAction implements PermissibleAction {
 
 
   ;
   ;
 
 
+  public static final Set<ApplicationConfigAction> ALTER_ACTIONS = Set.of(EDIT);
+
   @Nullable
   @Nullable
   public static ApplicationConfigAction fromString(String name) {
   public static ApplicationConfigAction fromString(String name) {
     return EnumUtils.getEnum(ApplicationConfigAction.class, name);
     return EnumUtils.getEnum(ApplicationConfigAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 11 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AuditAction.java

@@ -1,14 +1,24 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
 public enum AuditAction implements PermissibleAction {
 public enum AuditAction implements PermissibleAction {
 
 
-  VIEW;
+  VIEW
+
+  ;
+
+  private static final Set<AuditAction> ALTER_ACTIONS = Set.of();
 
 
   @Nullable
   @Nullable
   public static AuditAction fromString(String name) {
   public static AuditAction fromString(String name) {
     return EnumUtils.getEnum(AuditAction.class, name);
     return EnumUtils.getEnum(AuditAction.class, name);
   }
   }
+
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ClusterConfigAction.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -10,9 +11,15 @@ public enum ClusterConfigAction implements PermissibleAction {
 
 
   ;
   ;
 
 
+  public static final Set<ClusterConfigAction> ALTER_ACTIONS = Set.of(EDIT);
+
   @Nullable
   @Nullable
   public static ClusterConfigAction fromString(String name) {
   public static ClusterConfigAction fromString(String name) {
     return EnumUtils.getEnum(ClusterConfigAction.class, name);
     return EnumUtils.getEnum(ClusterConfigAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ConnectAction.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -12,9 +13,15 @@ public enum ConnectAction implements PermissibleAction {
 
 
   ;
   ;
 
 
+  public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, RESTART);
+
   @Nullable
   @Nullable
   public static ConnectAction fromString(String name) {
   public static ConnectAction fromString(String name) {
     return EnumUtils.getEnum(ConnectAction.class, name);
     return EnumUtils.getEnum(ConnectAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/ConsumerGroupAction.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -7,14 +8,19 @@ public enum ConsumerGroupAction implements PermissibleAction {
 
 
   VIEW,
   VIEW,
   DELETE,
   DELETE,
-
   RESET_OFFSETS
   RESET_OFFSETS
 
 
   ;
   ;
 
 
+  public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);
+
   @Nullable
   @Nullable
   public static ConsumerGroupAction fromString(String name) {
   public static ConsumerGroupAction fromString(String name) {
     return EnumUtils.getEnum(ConsumerGroupAction.class, name);
     return EnumUtils.getEnum(ConsumerGroupAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 10 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/KsqlAction.java

@@ -1,15 +1,24 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
 public enum KsqlAction implements PermissibleAction {
 public enum KsqlAction implements PermissibleAction {
 
 
-  EXECUTE;
+  EXECUTE
+
+  ;
+
+  public static final Set<KsqlAction> ALTER_ACTIONS = Set.of(EXECUTE);
 
 
   @Nullable
   @Nullable
   public static KsqlAction fromString(String name) {
   public static KsqlAction fromString(String name) {
     return EnumUtils.getEnum(KsqlAction.class, name);
     return EnumUtils.getEnum(KsqlAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java

@@ -5,4 +5,9 @@ public sealed interface PermissibleAction permits
     ConsumerGroupAction, SchemaAction,
     ConsumerGroupAction, SchemaAction,
     ConnectAction, ClusterConfigAction,
     ConnectAction, ClusterConfigAction,
     KsqlAction, TopicAction, AuditAction {
     KsqlAction, TopicAction, AuditAction {
+
+  String name();
+
+  boolean isAlter();
+
 }
 }

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/SchemaAction.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -13,9 +14,15 @@ public enum SchemaAction implements PermissibleAction {
 
 
   ;
   ;
 
 
+  public static final Set<SchemaAction> ALTER_ACTIONS = Set.of(CREATE, DELETE, EDIT, MODIFY_GLOBAL_COMPATIBILITY);
+
   @Nullable
   @Nullable
   public static SchemaAction fromString(String name) {
   public static SchemaAction fromString(String name) {
     return EnumUtils.getEnum(SchemaAction.class, name);
     return EnumUtils.getEnum(SchemaAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 7 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/TopicAction.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.model.rbac.permission;
 package com.provectus.kafka.ui.model.rbac.permission;
 
 
+import java.util.Set;
 import org.apache.commons.lang3.EnumUtils;
 import org.apache.commons.lang3.EnumUtils;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.Nullable;
 
 
@@ -9,16 +10,21 @@ public enum TopicAction implements PermissibleAction {
   CREATE,
   CREATE,
   EDIT,
   EDIT,
   DELETE,
   DELETE,
-
   MESSAGES_READ,
   MESSAGES_READ,
   MESSAGES_PRODUCE,
   MESSAGES_PRODUCE,
   MESSAGES_DELETE,
   MESSAGES_DELETE,
 
 
   ;
   ;
 
 
+  public static final Set<TopicAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, MESSAGES_PRODUCE, MESSAGES_DELETE);
+
   @Nullable
   @Nullable
   public static TopicAction fromString(String name) {
   public static TopicAction fromString(String name) {
     return EnumUtils.getEnum(TopicAction.class, name);
     return EnumUtils.getEnum(TopicAction.class, name);
   }
   }
 
 
+  @Override
+  public boolean isAlter() {
+    return ALTER_ACTIONS.contains(this);
+  }
 }
 }

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/SerdesInitializer.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.serdes.builtin.HexSerde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int32Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.Int64Serde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
 import com.provectus.kafka.ui.serdes.builtin.ProtobufFileSerde;
+import com.provectus.kafka.ui.serdes.builtin.ProtobufRawSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.StringSerde;
 import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
 import com.provectus.kafka.ui.serdes.builtin.UInt32Serde;
 import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
 import com.provectus.kafka.ui.serdes.builtin.UInt64Serde;
@@ -50,6 +51,7 @@ public class SerdesInitializer {
             .put(Base64Serde.name(), Base64Serde.class)
             .put(Base64Serde.name(), Base64Serde.class)
             .put(HexSerde.name(), HexSerde.class)
             .put(HexSerde.name(), HexSerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
             .put(UuidBinarySerde.name(), UuidBinarySerde.class)
+            .put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
             .build(),
             .build(),
         new CustomSerdeLoader()
         new CustomSerdeLoader()
     );
     );

+ 59 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerde.java

@@ -0,0 +1,59 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import com.google.protobuf.UnknownFieldSet;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.DeserializeResult;
+import com.provectus.kafka.ui.serde.api.RecordHeaders;
+import com.provectus.kafka.ui.serde.api.SchemaDescription;
+import com.provectus.kafka.ui.serdes.BuiltInSerde;
+import java.util.Map;
+import java.util.Optional;
+import lombok.SneakyThrows;
+
+public class ProtobufRawSerde implements BuiltInSerde {
+
+  public static String name() {
+    return "ProtobufDecodeRaw";
+  }
+
+  @Override
+  public Optional<String> getDescription() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<SchemaDescription> getSchema(String topic, Target type) {
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean canSerialize(String topic, Target type) {
+    return false;
+  }
+
+  @Override
+  public boolean canDeserialize(String topic, Target type) {
+    return true;
+  }
+
+  @Override
+  public Serializer serializer(String topic, Target type) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Deserializer deserializer(String topic, Target type) {
+    return new Deserializer() {
+        @SneakyThrows
+        @Override
+        public DeserializeResult deserialize(RecordHeaders headers, byte[] data) {
+            try {
+              UnknownFieldSet unknownFields = UnknownFieldSet.parseFrom(data);
+              return new DeserializeResult(unknownFields.toString(), DeserializeResult.Type.STRING, Map.of());
+            } catch (Exception e) {
+              throw new ValidationException(e.getMessage());
+            }
+        }
+    };
+  }
+}

+ 14 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java

@@ -6,6 +6,7 @@ import com.provectus.kafka.ui.exception.CustomBaseException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.model.rbac.Resource;
 import com.provectus.kafka.ui.model.rbac.Resource;
+import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.List;
@@ -33,16 +34,20 @@ record AuditRecord(String timestamp,
     return MAPPER.writeValueAsString(this);
     return MAPPER.writeValueAsString(this);
   }
   }
 
 
-  record AuditResource(String accessType, Resource type, @Nullable Object id) {
+  record AuditResource(String accessType, boolean alter, Resource type, @Nullable Object id) {
+
+    private static AuditResource create(PermissibleAction action, Resource type, @Nullable Object id) {
+      return new AuditResource(action.name(), action.isAlter(), type, id);
+    }
 
 
     static List<AuditResource> getAccessedResources(AccessContext ctx) {
     static List<AuditResource> getAccessedResources(AccessContext ctx) {
       List<AuditResource> resources = new ArrayList<>();
       List<AuditResource> resources = new ArrayList<>();
       ctx.getClusterConfigActions()
       ctx.getClusterConfigActions()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
+          .forEach(a -> resources.add(create(a, Resource.CLUSTERCONFIG, null)));
       ctx.getTopicActions()
       ctx.getTopicActions()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
+          .forEach(a -> resources.add(create(a, Resource.TOPIC, nameId(ctx.getTopic()))));
       ctx.getConsumerGroupActions()
       ctx.getConsumerGroupActions()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
+          .forEach(a -> resources.add(create(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
       ctx.getConnectActions()
       ctx.getConnectActions()
           .forEach(a -> {
           .forEach(a -> {
             Map<String, String> resourceId = new LinkedHashMap<>();
             Map<String, String> resourceId = new LinkedHashMap<>();
@@ -50,16 +55,16 @@ record AuditRecord(String timestamp,
             if (ctx.getConnector() != null) {
             if (ctx.getConnector() != null) {
               resourceId.put("connector", ctx.getConnector());
               resourceId.put("connector", ctx.getConnector());
             }
             }
-            resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
+            resources.add(create(a, Resource.CONNECT, resourceId));
           });
           });
       ctx.getSchemaActions()
       ctx.getSchemaActions()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
+          .forEach(a -> resources.add(create(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
       ctx.getKsqlActions()
       ctx.getKsqlActions()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
+          .forEach(a -> resources.add(create(a, Resource.KSQL, null)));
       ctx.getAclActions()
       ctx.getAclActions()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
+          .forEach(a -> resources.add(create(a, Resource.ACL, null)));
       ctx.getAuditAction()
       ctx.getAuditAction()
-          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
+          .forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
       return resources;
       return resources;
     }
     }
 
 

+ 7 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -1,5 +1,6 @@
 package com.provectus.kafka.ui.service.audit;
 package com.provectus.kafka.ui.service.audit;
 
 
+import static com.provectus.kafka.ui.config.ClustersProperties.AuditProperties.LogLevel.ALTER_ONLY;
 import static com.provectus.kafka.ui.service.MessagesService.createProducer;
 import static com.provectus.kafka.ui.service.MessagesService.createProducer;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
@@ -80,12 +81,13 @@ public class AuditService implements Closeable {
     }
     }
     boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
     boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
     boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
     boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
+    boolean alterLogOnly = Optional.ofNullable(auditProps.getLevel()).map(lvl -> lvl == ALTER_ONLY).orElse(true);
     if (!topicAudit && !consoleAudit) {
     if (!topicAudit && !consoleAudit) {
       return Optional.empty();
       return Optional.empty();
     }
     }
     if (!topicAudit) {
     if (!topicAudit) {
       log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
       log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
-      return Optional.of(consoleOnlyWriter(cluster));
+      return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
     }
     }
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
     boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
     boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
@@ -95,7 +97,7 @@ public class AuditService implements Closeable {
             "Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
             "Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
             cluster.getName()
             cluster.getName()
         );
         );
-        return Optional.of(consoleOnlyWriter(cluster));
+        return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
       }
       }
       return Optional.empty();
       return Optional.empty();
     }
     }
@@ -103,6 +105,7 @@ public class AuditService implements Closeable {
     return Optional.of(
     return Optional.of(
         new AuditWriter(
         new AuditWriter(
             cluster.getName(),
             cluster.getName(),
+            alterLogOnly,
             auditTopicName,
             auditTopicName,
             producerFactory.get(),
             producerFactory.get(),
             consoleAudit ? AUDIT_LOGGER : null
             consoleAudit ? AUDIT_LOGGER : null
@@ -110,8 +113,8 @@ public class AuditService implements Closeable {
     );
     );
   }
   }
 
 
-  private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
-    return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
+  private static AuditWriter consoleOnlyWriter(KafkaCluster cluster, boolean alterLogOnly) {
+    return new AuditWriter(cluster.getName(), alterLogOnly, null, null, AUDIT_LOGGER);
   }
   }
 
 
   /**
   /**

+ 5 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -18,6 +18,7 @@ import org.slf4j.Logger;
 
 
 @Slf4j
 @Slf4j
 record AuditWriter(String clusterName,
 record AuditWriter(String clusterName,
+                   boolean logAlterOperationsOnly,
                    @Nullable String targetTopic,
                    @Nullable String targetTopic,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
                    @Nullable Logger consoleLogger) implements Closeable {
                    @Nullable Logger consoleLogger) implements Closeable {
@@ -39,6 +40,10 @@ record AuditWriter(String clusterName,
   }
   }
 
 
   private void write(AuditRecord rec) {
   private void write(AuditRecord rec) {
+    if (logAlterOperationsOnly && rec.resources().stream().noneMatch(AuditResource::alter)) {
+      //we should only log alter operations, but this is read-only op
+      return;
+    }
     String json = rec.toJson();
     String json = rec.toJson();
     if (consoleLogger != null) {
     if (consoleLogger != null) {
       consoleLogger.info(json);
       consoleLogger.info(json);

+ 108 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/serdes/builtin/ProtobufRawSerdeTest.java

@@ -0,0 +1,108 @@
+package com.provectus.kafka.ui.serdes.builtin;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.serde.api.Serde;
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ProtobufRawSerdeTest {
+
+  private static final String DUMMY_TOPIC = "dummy-topic";
+
+  private ProtobufRawSerde serde;
+
+  @BeforeEach
+  void init() {
+    serde = new ProtobufRawSerde();
+  }
+
+  @SneakyThrows
+  ProtobufSchema getSampleSchema() {
+    return new ProtobufSchema(
+        """
+          syntax = "proto3";
+          message Message1 {
+            int32 my_field = 1;
+          }
+        """
+    );
+  }
+
+  @SneakyThrows
+  private byte[] getProtobufMessage() {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleSchema().toDescriptor("Message1"));
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
+    return builder.build().toByteArray();
+  }
+
+  @Test
+  void deserializeSimpleMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, getProtobufMessage());
+    assertThat(deserialized.getResult()).isEqualTo("1: 5\n");
+  }
+
+  @Test
+  void deserializeEmptyMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, new byte[0]);
+    assertThat(deserialized.getResult()).isEqualTo("");
+  }
+
+  @Test
+  void deserializeInvalidMessage() {
+    var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
+    assertThatThrownBy(() -> deserializer.deserialize(null, new byte[] { 1, 2, 3 }))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Protocol message contained an invalid tag");
+  }
+  
+  @Test
+  void deserializeNullMessage() {
+    var deserializer = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE);
+    assertThatThrownBy(() -> deserializer.deserialize(null, null))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Cannot read the array length");
+  }
+
+  ProtobufSchema getSampleNestedSchema() {
+    return new ProtobufSchema(
+      """
+        syntax = "proto3";
+        message Message2 {
+          int32 my_nested_field = 1;
+        }
+        message Message1 {
+          int32 my_field = 1;
+          Message2 my_nested_message = 2;
+        }
+      """
+    );
+  }
+
+  @SneakyThrows
+  private byte[] getComplexProtobufMessage() {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message1"));
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_field"), 5);
+    DynamicMessage.Builder nestedBuilder = DynamicMessage.newBuilder(getSampleNestedSchema().toDescriptor("Message2"));
+    nestedBuilder.setField(nestedBuilder.getDescriptorForType().findFieldByName("my_nested_field"), 10);
+    builder.setField(builder.getDescriptorForType().findFieldByName("my_nested_message"), nestedBuilder.build());
+
+    return builder.build().toByteArray();
+  }
+
+  @Test
+  void deserializeNestedMessage() {
+    var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE)
+        .deserialize(null, getComplexProtobufMessage());
+    assertThat(deserialized.getResult()).isEqualTo("1: 5\n2: {\n  1: 10\n}\n");
+  }
+}

+ 3 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/SchemaRegistryPaginationTest.java

@@ -42,8 +42,9 @@ public class SchemaRegistryPaginationTest {
                 new SchemaRegistryService.SubjectWithCompatibilityLevel(
                 new SchemaRegistryService.SubjectWithCompatibilityLevel(
                     new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
                     new SchemaSubject().subject(a.getArgument(1)), Compatibility.FULL)));
 
 
-    this.controller = new SchemasController(schemaRegistryService, new AccessControlServiceMock().getMock(),
-        mock(AuditService.class));
+    this.controller = new SchemasController(schemaRegistryService);
+    this.controller.setAccessControlService(new AccessControlServiceMock().getMock());
+    this.controller.setAuditService(mock(AuditService.class));
     this.controller.setClustersStorage(clustersStorage);
     this.controller.setClustersStorage(clustersStorage);
   }
   }
 
 

+ 4 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/TopicsServicePaginationTest.java

@@ -45,8 +45,8 @@ class TopicsServicePaginationTest {
   private final ClusterMapper clusterMapper = new ClusterMapperImpl();
   private final ClusterMapper clusterMapper = new ClusterMapperImpl();
   private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
   private final AccessControlService accessControlService = new AccessControlServiceMock().getMock();
 
 
-  private final TopicsController topicsController = new TopicsController(
-      topicsService, mock(TopicAnalysisService.class), clusterMapper, accessControlService, mock(AuditService.class));
+  private final TopicsController topicsController =
+      new TopicsController(topicsService, mock(TopicAnalysisService.class), clusterMapper);
 
 
   private void init(Map<String, InternalTopic> topicsInCache) {
   private void init(Map<String, InternalTopic> topicsInCache) {
 
 
@@ -59,6 +59,8 @@ class TopicsServicePaginationTest {
           List<String> lst = a.getArgument(1);
           List<String> lst = a.getArgument(1);
           return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
           return Mono.just(lst.stream().map(topicsInCache::get).collect(Collectors.toList()));
         });
         });
+    topicsController.setAccessControlService(accessControlService);
+    topicsController.setAuditService(mock(AuditService.class));
     topicsController.setClustersStorage(clustersStorage);
     topicsController.setClustersStorage(clustersStorage);
   }
   }
 
 

+ 13 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java

@@ -30,8 +30,8 @@ class AuditServiceTest {
   @Test
   @Test
   void isAuditTopicChecksIfAuditIsEnabledForCluster() {
   void isAuditTopicChecksIfAuditIsEnabledForCluster() {
     Map<String, AuditWriter> writers = Map.of(
     Map<String, AuditWriter> writers = Map.of(
-        "c1", new AuditWriter("с1", "c1topic", null, null),
-        "c2", new AuditWriter("c2", "c2topic", mock(KafkaProducer.class), null)
+        "c1", new AuditWriter("с1", true, "c1topic", null, null),
+        "c2", new AuditWriter("c2", false, "c2topic", mock(KafkaProducer.class), null)
     );
     );
 
 
     var auditService = new AuditService(writers);
     var auditService = new AuditService(writers);
@@ -79,6 +79,17 @@ class AuditServiceTest {
           .thenReturn(mock(KafkaProducer.class));
           .thenReturn(mock(KafkaProducer.class));
     }
     }
 
 
+    @Test
+    void logOnlyAlterOpsByDefault() {
+      var auditProps = new ClustersProperties.AuditProperties();
+      auditProps.setConsoleAuditEnabled(true);
+      clustersProperties.setAudit(auditProps);
+
+      var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
+      assertThat(maybeWriter)
+          .hasValueSatisfying(w -> assertThat(w.logAlterOperationsOnly()).isTrue());
+    }
+
     @Test
     @Test
     void noWriterIfNoAuditPropsSet() {
     void noWriterIfNoAuditPropsSet() {
       var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);
       var maybeWriter = createAuditWriter(cluster, () -> adminClientMock, producerSupplierMock);

+ 86 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditWriterTest.java

@@ -0,0 +1,86 @@
+package com.provectus.kafka.ui.service.audit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.AccessContext.AccessContextBuilder;
+import com.provectus.kafka.ui.model.rbac.permission.AclAction;
+import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
+import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
+import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
+import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
+import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+
+class AuditWriterTest {
+
+  final KafkaProducer<byte[], byte[]> producerMock = Mockito.mock(KafkaProducer.class);
+  final Logger loggerMock = Mockito.mock(Logger.class);
+  final AuthenticatedUser user = new AuthenticatedUser("someone", List.of());
+
+  @Nested
+  class AlterOperationsOnlyWriter {
+
+    final AuditWriter alterOnlyWriter = new AuditWriter("test", true, "test-topic", producerMock, loggerMock);
+
+    @ParameterizedTest
+    @MethodSource
+    void onlyLogsWhenAlterOperationIsPresentForOneOfResources(AccessContext ctxWithAlterOperation) {
+      alterOnlyWriter.write(ctxWithAlterOperation, user, null);
+      verify(producerMock).send(any(), any());
+      verify(loggerMock).info(any());
+    }
+
+    static Stream<AccessContext> onlyLogsWhenAlterOperationIsPresentForOneOfResources() {
+      Stream<UnaryOperator<AccessContextBuilder>> topicEditActions =
+          TopicAction.ALTER_ACTIONS.stream().map(a -> c -> c.topic("test").topicActions(a));
+      Stream<UnaryOperator<AccessContextBuilder>> clusterConfigEditActions =
+          ClusterConfigAction.ALTER_ACTIONS.stream().map(a -> c -> c.clusterConfigActions(a));
+      Stream<UnaryOperator<AccessContextBuilder>> aclEditActions =
+          AclAction.ALTER_ACTIONS.stream().map(a -> c -> c.aclActions(a));
+      Stream<UnaryOperator<AccessContextBuilder>> cgEditActions =
+          ConsumerGroupAction.ALTER_ACTIONS.stream().map(a -> c -> c.consumerGroup("cg").consumerGroupActions(a));
+      Stream<UnaryOperator<AccessContextBuilder>> schemaEditActions =
+          SchemaAction.ALTER_ACTIONS.stream().map(a -> c -> c.schema("sc").schemaActions(a));
+      Stream<UnaryOperator<AccessContextBuilder>> connEditActions =
+          ConnectAction.ALTER_ACTIONS.stream().map(a -> c -> c.connect("conn").connectActions(a));
+      return Stream.of(
+              topicEditActions, clusterConfigEditActions, aclEditActions,
+              cgEditActions, connEditActions, schemaEditActions
+          )
+          .flatMap(c -> c)
+          .map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    void doesNothingIfNoResourceHasAlterAction(AccessContext readOnlyCxt) {
+      alterOnlyWriter.write(readOnlyCxt, user, null);
+      verifyNoInteractions(producerMock);
+      verifyNoInteractions(loggerMock);
+    }
+
+    static Stream<AccessContext> doesNothingIfNoResourceHasAlterAction() {
+      return Stream.<UnaryOperator<AccessContextBuilder>>of(
+          c -> c.topic("test").topicActions(TopicAction.VIEW),
+          c -> c.clusterConfigActions(ClusterConfigAction.VIEW),
+          c -> c.aclActions(AclAction.VIEW),
+          c -> c.consumerGroup("cg").consumerGroupActions(ConsumerGroupAction.VIEW),
+          c -> c.schema("sc").schemaActions(SchemaAction.VIEW),
+          c -> c.connect("conn").connectActions(ConnectAction.VIEW)
+      ).map(setter -> setter.apply(AccessContext.builder().cluster("test").operationName("test")).build());
+    }
+  }
+
+}

+ 3 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -4022,6 +4022,9 @@ components:
                       audit:
                       audit:
                         type: object
                         type: object
                         properties:
                         properties:
+                          level:
+                            type: string
+                            enum: [ "ALL", "ALTER_ONLY" ]
                           topic:
                           topic:
                             type: string
                             type: string
                           auditTopicsPartitions:
                           auditTopicsPartitions: