iliax 2 years ago
parent
commit
af6caa0541

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -15,6 +15,7 @@ import com.provectus.kafka.ui.service.audit.AuditService;
 import com.provectus.kafka.ui.service.rbac.AccessControlService;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.ResponseEntity;
@@ -67,12 +68,12 @@ public class BrokersController extends AbstractController implements BrokersApi
 
   @Override
   public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
-                                                                            List<Integer> brokers,
+                                                                            @Nullable List<Integer> brokers,
                                                                             ServerWebExchange exchange) {
     var context = AccessContext.builder()
         .cluster(clusterName)
         .auditOperation("getAllBrokersLogdirs")
-        .operationParams(Map.of("brokerIds", brokers))
+        .operationParams(Map.of("brokerIds", brokers == null ? List.of() : brokers))
         .build();
     return accessControlService.validateAccess(context)
         .thenReturn(ResponseEntity.ok(

+ 7 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/MessagesController.java

@@ -18,6 +18,7 @@ import com.provectus.kafka.ui.model.SerdeUsageDTO;
 import com.provectus.kafka.ui.model.TopicMessageEventDTO;
 import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
 import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
 import com.provectus.kafka.ui.service.DeserializationService;
 import com.provectus.kafka.ui.service.MessagesService;
@@ -32,6 +33,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.units.qual.A;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.RestController;
 import org.springframework.web.server.ServerWebExchange;
@@ -85,6 +87,11 @@ public class MessagesController extends AbstractController implements MessagesAp
         .cluster(clusterName)
         .topic(topicName)
         .topicActions(MESSAGES_READ)
+        .auditActions(
+            auditService.isAuditTopic(getCluster(clusterName), topicName)
+                ? new AuditAction[] {AuditAction.VIEW}
+                : new AuditAction[] {}
+        )
         .auditOperation("getTopicMessages")
         .build();
 

+ 13 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.model.rbac;
 
 import com.provectus.kafka.ui.model.rbac.permission.AclAction;
 import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
+import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
 import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
@@ -41,6 +42,8 @@ public class AccessContext {
 
   Collection<AclAction> aclActions;
 
+  Collection<AuditAction> auditAction;
+
   String operationName;
   Object operationParams;
 
@@ -63,8 +66,10 @@ public class AccessContext {
     private Collection<SchemaAction> schemaActions = Collections.emptySet();
     private Collection<KsqlAction> ksqlActions = Collections.emptySet();
     private Collection<AclAction> aclActions = Collections.emptySet();
-    String operationName;
-    Object operationParams;
+    private Collection<AuditAction> auditActions = Collections.emptySet();
+
+    private String operationName;
+    private Object operationParams;
 
     private AccessContextBuilder() {
     }
@@ -147,6 +152,11 @@ public class AccessContext {
       return this;
     }
 
+    public AccessContextBuilder auditActions(AuditAction... actions) {
+      this.auditActions = List.of(actions);
+      return this;
+    }
+
     public AccessContextBuilder auditOperation(String operationName) {
       this.operationName = operationName;
       return this;
@@ -171,7 +181,7 @@ public class AccessContext {
           connect, connectActions,
           connector,
           schema, schemaActions,
-          ksqlActions, aclActions,
+          ksqlActions, aclActions, auditActions,
           operationName, operationParams);
     }
   }

+ 5 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Permission.java

@@ -2,11 +2,13 @@ package com.provectus.kafka.ui.model.rbac;
 
 import static com.provectus.kafka.ui.model.rbac.Resource.ACL;
 import static com.provectus.kafka.ui.model.rbac.Resource.APPLICATIONCONFIG;
+import static com.provectus.kafka.ui.model.rbac.Resource.AUDIT;
 import static com.provectus.kafka.ui.model.rbac.Resource.CLUSTERCONFIG;
 import static com.provectus.kafka.ui.model.rbac.Resource.KSQL;
 
 import com.provectus.kafka.ui.model.rbac.permission.AclAction;
 import com.provectus.kafka.ui.model.rbac.permission.ApplicationConfigAction;
+import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
 import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
 import com.provectus.kafka.ui.model.rbac.permission.ConsumerGroupAction;
@@ -28,7 +30,8 @@ import org.springframework.util.Assert;
 @EqualsAndHashCode
 public class Permission {
 
-  private static final List<Resource> RBAC_ACTION_EXEMPT_LIST = List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL);
+  private static final List<Resource> RBAC_ACTION_EXEMPT_LIST =
+      List.of(KSQL, CLUSTERCONFIG, APPLICATIONCONFIG, ACL, AUDIT);
 
   Resource resource;
   List<String> actions;
@@ -79,6 +82,7 @@ public class Permission {
       case CONNECT -> Arrays.stream(ConnectAction.values()).map(Enum::toString).toList();
       case KSQL -> Arrays.stream(KsqlAction.values()).map(Enum::toString).toList();
       case ACL -> Arrays.stream(AclAction.values()).map(Enum::toString).toList();
+      case AUDIT -> Arrays.stream(AuditAction.values()).map(Enum::toString).toList();
     };
   }
 

+ 2 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/Resource.java

@@ -12,7 +12,8 @@ public enum Resource {
   SCHEMA,
   CONNECT,
   KSQL,
-  ACL;
+  ACL,
+  AUDIT;
 
   @Nullable
   public static Resource fromString(String name) {

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/AuditAction.java

@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.model.rbac.permission;
+
+import org.apache.commons.lang3.EnumUtils;
+import org.jetbrains.annotations.Nullable;
+
+public enum AuditAction implements PermissibleAction {
+
+  VIEW;
+
+  @Nullable
+  public static AuditAction fromString(String name) {
+    return EnumUtils.getEnum(AuditAction.class, name);
+  }
+}

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/permission/PermissibleAction.java

@@ -4,5 +4,5 @@ public sealed interface PermissibleAction permits
     AclAction, ApplicationConfigAction,
     ConsumerGroupAction, SchemaAction,
     ConnectAction, ClusterConfigAction,
-    KsqlAction, TopicAction {
+    KsqlAction, TopicAction, AuditAction {
 }

+ 3 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/BrokerService.java

@@ -17,6 +17,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.ConfigEntry;
@@ -110,7 +111,7 @@ public class BrokerService {
   }
 
   private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getClusterLogDirs(
-      KafkaCluster cluster, List<Integer> reqBrokers) {
+      KafkaCluster cluster, @Nullable List<Integer> reqBrokers) {
     return adminClientService.get(cluster)
         .flatMap(admin -> {
           List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
@@ -128,7 +129,7 @@ public class BrokerService {
         });
   }
 
-  public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(KafkaCluster cluster, List<Integer> brokers) {
+  public Flux<BrokersLogdirsDTO> getAllBrokersLogdirs(KafkaCluster cluster, @Nullable List<Integer> brokers) {
     return getClusterLogDirs(cluster, brokers)
         .map(describeLogDirsMapper::toBrokerLogDirsList)
         .flatMapMany(Flux::fromIterable);

+ 2 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java

@@ -59,6 +59,8 @@ record AuditRecord(String timestamp,
           .forEach(a -> resources.add(new AuditResource(a, Resource.KSQL, null)));
       ctx.getAclActions()
           .forEach(a -> resources.add(new AuditResource(a, Resource.ACL, null)));
+      ctx.getAuditAction()
+          .forEach(a -> resources.add(new AuditResource(a, Resource.AUDIT, null)));
       return resources;
     }
 

+ 54 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -1,5 +1,8 @@
 package com.provectus.kafka.ui.service.audit;
 
+import static com.provectus.kafka.ui.service.MessagesService.*;
+
+import com.google.common.annotations.VisibleForTesting;
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.config.auth.AuthenticatedUser;
 import com.provectus.kafka.ui.config.auth.RbacUser;
@@ -7,25 +10,27 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.rbac.AccessContext;
 import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.ClustersStorage;
-import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.core.context.SecurityContext;
 import org.springframework.stereotype.Service;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Signal;
-import reactor.core.scheduler.Schedulers;
+
 
 @Slf4j
 @Service
@@ -43,13 +48,14 @@ public class AuditService implements Closeable {
       ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"
   );
 
-  private final Map<String, AuditWriter> auditWriters = new HashMap<>();
+  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
 
-  public AuditService(ClustersProperties clustersProperties,
-                      AdminClientService adminClientService,
-                      ClustersStorage clustersStorage) {
-    for (var clusterProps : Optional.ofNullable(clustersProperties.getClusters()).orElse(List.of())) {
-      var cluster = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
+  private final Map<String, AuditWriter> auditWriters;
+
+  @Autowired
+  public AuditService(AdminClientService adminClientService, ClustersStorage clustersStorage) {
+    Map<String, AuditWriter> auditWriters = new HashMap<>();
+    for (var cluster : clustersStorage.getKafkaClusters()) {
       ReactiveAdminClient adminClient;
       try {
         adminClient = adminClientService.get(cluster).block();
@@ -57,39 +63,56 @@ public class AuditService implements Closeable {
         printAuditInitError(cluster, "Error connect to cluster", e);
         continue;
       }
-      initialize(cluster, adminClient);
+      createAuditWriter(cluster, adminClient, () -> createProducer(cluster, AUDIT_PRODUCER_CONFIG))
+          .ifPresent(writer -> auditWriters.put(cluster.getName(), writer));
     }
+    this.auditWriters = auditWriters;
+  }
+
+  @VisibleForTesting
+  AuditService(Map<String, AuditWriter> auditWriters) {
+    this.auditWriters = auditWriters;
   }
 
-  private void initialize(KafkaCluster cluster, ReactiveAdminClient ac) {
+  @VisibleForTesting
+  static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
+                                                 ReactiveAdminClient ac,
+                                                 Supplier<KafkaProducer<byte[], byte[]>> producerFactory) {
     var auditProps = cluster.getOriginalProperties().getAudit();
     if (auditProps == null) {
-      return;
+      return Optional.empty();
     }
     boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
     boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
     if (!topicAudit && !consoleAudit) {
-      return;
+      return Optional.empty();
     }
     String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
     @Nullable KafkaProducer<byte[], byte[]> producer = null;
     if (topicAudit && createTopicIfNeeded(cluster, ac, auditTopicName, auditProps)) {
-      producer = MessagesService.createProducer(cluster, AUDIT_PRODUCER_CONFIG);
+      producer = producerFactory.get();
     }
-    auditWriters.put(cluster.getName(), new AuditWriter(cluster.getName(), auditTopicName, producer, consoleAudit));
     log.info("Audit service initialized for cluster '{}'", cluster.getName());
+    return Optional.of(
+        new AuditWriter(
+            cluster.getName(),
+            auditTopicName,
+            producer,
+            consoleAudit ? AUDIT_LOGGER : null
+        )
+    );
   }
 
   /**
    * @return true if topic created/existing and producing can be enabled
    */
-  private boolean createTopicIfNeeded(KafkaCluster cluster,
-                                      ReactiveAdminClient ac,
-                                      String auditTopicName,
-                                      ClustersProperties.AuditProperties auditProps) {
+  private static boolean createTopicIfNeeded(KafkaCluster cluster,
+                                             ReactiveAdminClient ac,
+                                             String auditTopicName,
+                                             ClustersProperties.AuditProperties auditProps) {
     boolean topicExists;
     try {
-      topicExists = ac.listTopics(false).block().contains(auditTopicName);
+      topicExists = ac.listTopics(true).block().contains(auditTopicName);
     } catch (Exception e) {
       printAuditInitError(cluster, "Error checking audit topic existence", e);
       return false;
@@ -116,7 +139,7 @@ public class AuditService implements Closeable {
     }
   }
 
-  private void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
+  private static void printAuditInitError(KafkaCluster cluster, String errorMsg, Exception cause) {
     log.error("-----------------------------------------------------------------");
     log.error(
         "Error initializing Audit Service for cluster '{}'. Audit will be disabled. See error below: ",
@@ -126,6 +149,13 @@ public class AuditService implements Closeable {
     log.error("-----------------------------------------------------------------");
   }
 
+  public boolean isAuditTopic(KafkaCluster cluster, String topic) {
+    var writer = auditWriters.get(cluster.getName());
+    return writer != null
+        && topic.equals(writer.targetTopic())
+        && writer.isTopicWritingEnabled();
+  }
+
   public void audit(AccessContext acxt, Signal<?> sig) {
     if (sig.isOnComplete()) {
       extractUser(sig)
@@ -148,7 +178,7 @@ public class AuditService implements Closeable {
           .map(user -> new AuthenticatedUser(user.name(), user.groups()))
           .switchIfEmpty(NO_AUTH_USER);
     } else {
-      return NO_AUTH_USER.publishOn(Schedulers.immediate());
+      return NO_AUTH_USER;
     }
   }
 
@@ -164,7 +194,8 @@ public class AuditService implements Closeable {
           writer.write(ctx, user, th);
         }
       } else {
-        //TODO: discuss app config changes - where to log?
+        // cluster-independent operation
+        AuditWriter.writeAppOperation(AUDIT_LOGGER, ctx, user, th);
       }
     } catch (Exception e) {
       log.warn("Error sending audit record", e);

+ 26 - 7
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -14,18 +14,36 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Slf4j
 record AuditWriter(String clusterName,
                    String targetTopic,
                    @Nullable KafkaProducer<byte[], byte[]> producer,
-                   boolean logToConsole) implements Closeable {
+                   @Nullable Logger consoleLogger) implements Closeable {
 
-  //TODO: discuss AUDIT LOG FORMAT and name
-  private static final Logger AUDIT_LOGGER = LoggerFactory.getLogger("audit");
+  boolean isTopicWritingEnabled() {
+    return producer != null;
+  }
+
+  // application-level (cluster-independent) operation
+  static void writeAppOperation(Logger consoleLogger,
+                                AccessContext ctx,
+                                AuthenticatedUser user,
+                                @Nullable Throwable th) {
+    consoleLogger.info(
+        new AuditRecord(
+            DateTimeFormatter.ISO_INSTANT.format(Instant.now()),
+            user.principal(),
+            null,
+            AuditResource.getAccessedResources(ctx),
+            ctx.getOperationName(),
+            ctx.getOperationParams(),
+            th == null ? OperationResult.successful() : OperationResult.error(th)
+        ).toJson()
+    );
+  }
 
-  public void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
+  void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
     write(
         new AuditRecord(
             DateTimeFormatter.ISO_INSTANT.format(Instant.now()),
@@ -41,8 +59,8 @@ record AuditWriter(String clusterName,
 
   private void write(AuditRecord rec) {
     String json = rec.toJson();
-    if (logToConsole) {
-      AUDIT_LOGGER.info(json);
+    if (consoleLogger != null) {
+      consoleLogger.info(json);
     }
     if (producer != null) {
       producer.send(
@@ -55,6 +73,7 @@ record AuditWriter(String clusterName,
     }
   }
 
+
   @Override
   public void close() {
     Optional.ofNullable(producer).ifPresent(KafkaProducer::close);

+ 19 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java

@@ -44,8 +44,6 @@ import org.springframework.security.oauth2.client.registration.InMemoryReactiveC
 import org.springframework.stereotype.Service;
 import org.springframework.util.Assert;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
 
 @Service
 @RequiredArgsConstructor
@@ -111,7 +109,8 @@ public class AccessControlService {
                   && isConnectorAccessible(context, user) // TODO connector selectors
                   && isSchemaAccessible(context, user)
                   && isKsqlAccessible(context, user)
-                  && isAclAccessible(context, user);
+                  && isAclAccessible(context, user)
+                  && isAuditAccessible(context, user);
 
           if (!accessGranted) {
             throw new AccessDeniedException("Access denied");
@@ -384,6 +383,23 @@ public class AccessControlService {
     return isAccessible(Resource.ACL, null, user, context, requiredActions);
   }
 
+  private boolean isAuditAccessible(AccessContext context, AuthenticatedUser user) {
+    if (!rbacEnabled) {
+      return true;
+    }
+
+    if (context.getAuditAction().isEmpty()) {
+      return true;
+    }
+
+    Set<String> requiredActions = context.getAuditAction()
+        .stream()
+        .map(a -> a.toString().toUpperCase())
+        .collect(Collectors.toSet());
+
+    return isAccessible(Resource.AUDIT, null, user, context, requiredActions);
+  }
+
   public Set<ProviderAuthorityExtractor> getOauthExtractors() {
     return oauthExtractors;
   }

+ 150 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java

@@ -0,0 +1,150 @@
+package com.provectus.kafka.ui.service.audit;
+
+import static com.provectus.kafka.ui.service.audit.AuditService.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+
+import com.provectus.kafka.ui.config.ClustersProperties;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.model.rbac.AccessContext;
+import com.provectus.kafka.ui.service.ReactiveAdminClient;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Signal;
+
+class AuditServiceTest {
+
+  @Test
+  void isAuditTopicChecksIfAuditIsEnabledForCluster() {
+    Map<String, AuditWriter> writers = Map.of(
+        "c1", new AuditWriter("с1", "c1topic", null, null),
+        "c2", new AuditWriter("c2", "c2topic", mock(KafkaProducer.class), null)
+    );
+
+    var auditService = new AuditService(writers);
+    assertThat(auditService.isAuditTopic(KafkaCluster.builder().name("notExist").build(), "some"))
+        .isFalse();
+    assertThat(auditService.isAuditTopic(KafkaCluster.builder().name("c1").build(), "c1topic"))
+        .isFalse();
+    assertThat(auditService.isAuditTopic(KafkaCluster.builder().name("c2").build(), "c2topic"))
+        .isTrue();
+  }
+
+  @Test
+  void auditCallsWriterMethodDependingOnSignal() {
+    var auditWriter = mock(AuditWriter.class);
+    var auditService = new AuditService(Map.of("test", auditWriter));
+
+    var cxt = AccessContext.builder().cluster("test").build();
+
+    auditService.audit(cxt, Signal.complete());
+    verify(auditWriter).write(any(), any(), eq(null));
+
+    var th = new Exception("testError");
+    auditService.audit(cxt, Signal.error(th));
+    verify(auditWriter).write(any(), any(), eq(th));
+  }
+
+  @Nested
+  class CreateAuditWriter {
+
+    private final ReactiveAdminClient adminClientMock = mock(ReactiveAdminClient.class);
+    private final Supplier<KafkaProducer<byte[], byte[]>> producerSupplierMock = mock(Supplier.class);
+
+    private final ClustersProperties.Cluster clustersProperties = new ClustersProperties.Cluster();
+
+    private final KafkaCluster cluster = KafkaCluster
+        .builder()
+        .name("test")
+        .originalProperties(clustersProperties)
+        .build();
+
+
+    @BeforeEach
+    void init() {
+      when(producerSupplierMock.get())
+          .thenReturn(mock(KafkaProducer.class));
+    }
+
+    @Test
+    void noWriterIfNoAuditPropsSet() {
+      var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+      assertThat(maybeWriter).isEmpty();
+    }
+
+    @Test
+    void setsLoggerIfConsoleLoggingEnabled(){
+      var auditProps = new ClustersProperties.AuditProperties();
+      auditProps.setConsoleAuditEnabled(true);
+      clustersProperties.setAudit(auditProps);
+
+      var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+      assertThat(maybeWriter).isPresent();
+
+      var writer = maybeWriter.get();
+      assertThat(writer.consoleLogger()).isNotNull();
+    }
+
+    @Nested
+    class WhenTopicAuditEnabled {
+
+      @BeforeEach
+      void setTopicWriteProperties() {
+        var auditProps = new ClustersProperties.AuditProperties();
+        auditProps.setTopicAuditEnabled(true);
+        auditProps.setTopic("test_audit_topic");
+        auditProps.setAuditTopicsPartitions(3);
+        auditProps.setAuditTopicProperties(Map.of("p1", "v1"));
+        clustersProperties.setAudit(auditProps);
+      }
+
+      @Test
+      void createsProducerIfTopicExists() {
+        when(adminClientMock.listTopics(true))
+            .thenReturn(Mono.just(Set.of("test_audit_topic")));
+
+        var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+        assertThat(maybeWriter).isPresent();
+
+        //checking there was no topic creation request
+        verify(adminClientMock, times(0))
+            .createTopic(any(), anyInt(), anyInt(), anyMap());
+
+        var writer = maybeWriter.get();
+        assertThat(writer.producer()).isNotNull();
+        assertThat(writer.targetTopic()).isEqualTo("test_audit_topic");
+      }
+
+      @Test
+      void createsProducerAndTopicIfItIsNotExist() {
+        when(adminClientMock.listTopics(true))
+            .thenReturn(Mono.just(Set.of()));
+
+        when(adminClientMock.createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap()))
+            .thenReturn(Mono.empty());
+
+        var maybeWriter = createAuditWriter(cluster, adminClientMock, producerSupplierMock);
+        assertThat(maybeWriter).isPresent();
+
+        //verifying topic created
+        verify(adminClientMock).createTopic(eq("test_audit_topic"), eq(3), eq(null), anyMap());
+
+        var writer = maybeWriter.get();
+        assertThat(writer.producer()).isNotNull();
+        assertThat(writer.targetTopic()).isEqualTo("test_audit_topic");
+      }
+
+    }
+  }
+
+
+}

+ 1 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -3465,6 +3465,7 @@ components:
         - CONNECT
         - KSQL
         - ACL
+        - AUDIT
 
     KafkaAcl:
       type: object