iliax 2 năm trước cách đây
mục cha
commit
d009cfb84b

+ 2 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/BrokersController.java

@@ -89,7 +89,7 @@ public class BrokersController extends AbstractController implements BrokersApi
         .cluster(clusterName)
         .clusterConfigActions(ClusterConfigAction.VIEW)
         .auditOperation("getBrokerConfig")
-        .operationParams("brokerId", id)
+        .operationParams(Map.of("brokerId", id))
         .build();
 
     return accessControlService.validateAccess(context).thenReturn(
@@ -108,7 +108,7 @@ public class BrokersController extends AbstractController implements BrokersApi
         .cluster(clusterName)
         .clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
         .auditOperation("updateBrokerTopicPartitionLogDir")
-        .operationParams("brokerId", id)
+        .operationParams(Map.of("brokerId", id))
         .build();
 
     return accessControlService.validateAccess(context).then(

+ 18 - 16
kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java

@@ -65,20 +65,22 @@ public class TopicsController extends AbstractController implements TopicsApi {
 
   @Override
   public Mono<ResponseEntity<TopicDTO>> createTopic(
-      String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
-
-    var context = AccessContext.builder()
-        .cluster(clusterName)
-        .topicActions(CREATE)
-        .auditOperation("createTopic")
-        .build();
-
-    return accessControlService.validateAccess(context).then(
-        topicsService.createTopic(getCluster(clusterName), topicCreation)
-            .map(clusterMapper::toTopic)
-            .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
-            .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
-    ).doOnEach(sig -> auditService.audit(context, sig));
+      String clusterName, @Valid Mono<TopicCreationDTO> topicCreationMono, ServerWebExchange exchange) {
+    return topicCreationMono.flatMap(topicCreation -> {
+      var context = AccessContext.builder()
+          .cluster(clusterName)
+          .topicActions(CREATE)
+          .auditOperation("createTopic")
+          .operationParams(topicCreation)
+          .build();
+
+      return accessControlService.validateAccess(context)
+          .then(topicsService.createTopic(getCluster(clusterName), topicCreation))
+          .map(clusterMapper::toTopic)
+          .map(s -> new ResponseEntity<>(s, HttpStatus.OK))
+          .switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
+          .doOnEach(sig -> auditService.audit(context, sig));
+    });
   }
 
   @Override
@@ -331,8 +333,8 @@ public class TopicsController extends AbstractController implements TopicsApi {
 
     return accessControlService.validateAccess(context)
         .thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
-        .map(ResponseEntity::ok)
-        .orElseGet(() -> ResponseEntity.notFound().build()))
+            .map(ResponseEntity::ok)
+            .orElseGet(() -> ResponseEntity.notFound().build()))
         .doOnEach(sig -> auditService.audit(context, sig));
   }
 

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/rbac/AccessContext.java

@@ -163,7 +163,7 @@ public class AccessContext {
       return this;
     }
 
-    public AccessContextBuilder operationParams(Object... operationParams) {
+    public AccessContextBuilder operationParams(Object operationParams) {
       this.operationParams = operationParams;
       return this;
     }

+ 9 - 12
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

@@ -168,21 +168,18 @@ public class TopicsService {
             .map(m -> m.values().stream().findFirst().orElse(List.of())));
   }
 
-  private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient,
-                                          Mono<TopicCreationDTO> topicCreation) {
-    return topicCreation.flatMap(topicData ->
-            adminClient.createTopic(
-                topicData.getName(),
-                topicData.getPartitions(),
-                topicData.getReplicationFactor(),
-                topicData.getConfigs()
-            ).thenReturn(topicData)
-        )
+  private Mono<InternalTopic> createTopic(KafkaCluster c, ReactiveAdminClient adminClient, TopicCreationDTO topicData) {
+    return adminClient.createTopic(
+            topicData.getName(),
+            topicData.getPartitions(),
+            topicData.getReplicationFactor(),
+            topicData.getConfigs())
+        .thenReturn(topicData)
         .onErrorMap(t -> new TopicMetadataException(t.getMessage(), t))
-        .flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName()));
+        .then(loadTopicAfterCreation(c, topicData.getName()));
   }
 
-  public Mono<InternalTopic> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
+  public Mono<InternalTopic> createTopic(KafkaCluster cluster, TopicCreationDTO topicCreation) {
     return adminClientService.get(cluster)
         .flatMap(ac -> createTopic(cluster, ac, topicCreation));
   }

+ 9 - 9
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditRecord.java

@@ -34,16 +34,16 @@ record AuditRecord(String timestamp,
     return MAPPER.writeValueAsString(this);
   }
 
-  record AuditResource(PermissibleAction accessType, Resource type, @Nullable Object id) {
+  record AuditResource(String accessType, Resource type, @Nullable Object id) {
 
     static List<AuditResource> getAccessedResources(AccessContext ctx) {
       List<AuditResource> resources = new ArrayList<>();
       ctx.getClusterConfigActions()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.CLUSTERCONFIG, null)));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
       ctx.getTopicActions()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.TOPIC, nameId(ctx.getTopic()))));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
       ctx.getConsumerGroupActions()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
       ctx.getConnectActions()
           .forEach(a -> {
             Map<String, String> resourceId = new LinkedHashMap<>();
@@ -51,16 +51,16 @@ record AuditRecord(String timestamp,
             if (ctx.getConnector() != null) {
               resourceId.put("connector", ctx.getConnector());
             }
-            resources.add(new AuditResource(a, Resource.CONNECT, resourceId));
+            resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
           });
       ctx.getSchemaActions()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
       ctx.getKsqlActions()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.KSQL, null)));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
       ctx.getAclActions()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.ACL, null)));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
       ctx.getAuditAction()
-          .forEach(a -> resources.add(new AuditResource(a, Resource.AUDIT, null)));
+          .forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
       return resources;
     }
 

+ 10 - 0
kafka-ui-api/src/main/resources/logback-spring.xml

@@ -7,8 +7,18 @@
         </encoder>
     </appender>
 
+    <appender name="AUDIT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%black(%d{ISO8601}) %yellow(%c{1}): %msg%n%throwable</pattern>
+        </encoder>
+    </appender>
+
     <root level="info">
         <appender-ref ref="STDOUT"/>
     </root>
 
+    <audit level="info">
+        <appender-ref ref="AUDIT"/>
+    </audit>
+
 </configuration>

+ 6 - 2
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/AuditServiceTest.java

@@ -1,11 +1,15 @@
 package com.provectus.kafka.ui.service.audit;
 
-import static com.provectus.kafka.ui.service.audit.AuditService.*;
+import static com.provectus.kafka.ui.service.audit.AuditService.createAuditWriter;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyMap;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.provectus.kafka.ui.config.ClustersProperties;
 import com.provectus.kafka.ui.model.KafkaCluster;

+ 83 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/audit/KafkaAuditTest.java

@@ -0,0 +1,83 @@
+package com.provectus.kafka.ui.service.audit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.model.TopicCreationDTO;
+import com.provectus.kafka.ui.model.rbac.Resource;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+public class KafkaAuditTest extends AbstractIntegrationTest {
+
+  @Autowired
+  private WebTestClient webTestClient;
+
+  @Test
+  void auditRecordWrittenIntoKafkaWhenNewTopicCreated() {
+    String newTopicName = "test_audit_" + UUID.randomUUID();
+
+    webTestClient.post()
+        .uri("/api/clusters/{clusterName}/topics", LOCAL)
+        .bodyValue(
+            new TopicCreationDTO()
+                .replicationFactor(1)
+                .partitions(1)
+                .name(newTopicName)
+        )
+        .exchange()
+        .expectStatus()
+        .isOk();
+
+    try (var consumer = createConsumer()) {
+      var jsonMapper = new JsonMapper();
+      consumer.subscribe(List.of("__kui-audit-log"));
+      Awaitility.await()
+          .pollInSameThread()
+          .atMost(Duration.ofSeconds(10))
+          .untilAsserted(() -> {
+            var polled = consumer.poll(Duration.ofSeconds(1));
+            assertThat(polled).anyMatch(rec -> {
+              try {
+                AuditRecord record = jsonMapper.readValue(rec.value(), AuditRecord.class);
+                return Map.of(
+                    "name", newTopicName,
+                    "partitions", 1,
+                    "replicationFactor", 1,
+                    "configs", Map.of()
+                ).equals(record.operationParams())
+                    && "createTopic".equals(record.operation())
+                    && record.resources().stream().anyMatch(r -> r.type() == Resource.TOPIC)
+                    && record.result().success();
+              } catch (JsonProcessingException e) {
+                return false;
+              }
+            });
+          });
+    }
+  }
+
+  private KafkaConsumer<?, String> createConsumer() {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaAuditTest.class.getName());
+    return new KafkaConsumer<>(props);
+  }
+
+}