浏览代码

todos removed

iliax 2 年之前
父节点
当前提交
311c0bc30c

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -149,9 +149,9 @@ public class ClustersProperties {
   @AllArgsConstructor
   @AllArgsConstructor
   public static class AuditProperties {
   public static class AuditProperties {
     String topic;
     String topic;
-    Integer audiTopicsPartitions;
-    Boolean disable;
-    Boolean enableConsoleAudit;
+    Integer auditTopicsPartitions;
+    Boolean topicAuditEnabled;
+    Boolean consoleAuditEnabled;
     Map<String, String> auditTopicProperties;
     Map<String, String> auditTopicProperties;
   }
   }
 
 

+ 13 - 2
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditService.java

@@ -16,6 +16,7 @@ import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.io.Closeable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -53,9 +54,10 @@ public class AuditService implements Closeable {
     this.clustersStorage = clustersStorage;
     this.clustersStorage = clustersStorage;
     if (clustersProperties.getClusters() != null) {
     if (clustersProperties.getClusters() != null) {
       for (var clusterProps : clustersProperties.getClusters()) {
       for (var clusterProps : clustersProperties.getClusters()) {
+        var c = clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow();
         createTopicAndProducer(
         createTopicAndProducer(
-            adminClientService,
-            clustersStorage.getClusterByName(clusterProps.getName()).orElseThrow(),
+            c,
+            adminClientService.get(c).block(),
             messagesService
             messagesService
         );
         );
       }
       }
@@ -86,7 +88,16 @@ public class AuditService implements Closeable {
                                       ReactiveAdminClient ac,
                                       ReactiveAdminClient ac,
                                       MessagesService ms) {
                                       MessagesService ms) {
     var props = c.getOriginalProperties();
     var props = c.getOriginalProperties();
+
     if (props.getAudit() != null) {
     if (props.getAudit() != null) {
+      var auditProps = props.getAudit();
+      boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
+      boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
+      String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
+      int auditTopicPartis = Optional.ofNullable(auditProps.getAuditTopicsPartitions()).orElse(DEFAULT_AUDIT_TOPIC_PARTITIONS);
+      Map<String, String> topicCreationProps = new HashMap<>(DEFAULT_AUDIT_TOPIC_PROPERTIES);
+      Optional.ofNullable(auditProps.getAuditTopicProperties())
+          .ifPresent(topicCreationProps::putAll);
 
 
     }
     }
   }
   }

+ 42 - 23
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/audit/AuditWriter.java

@@ -12,8 +12,12 @@ import com.provectus.kafka.ui.service.AdminClientService;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.MessagesService;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import com.provectus.kafka.ui.service.ReactiveAdminClient;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.springframework.security.access.AccessDeniedException;
 import org.springframework.security.access.AccessDeniedException;
 
 
 record AuditWriter(String targetTopic,
 record AuditWriter(String targetTopic,
@@ -21,9 +25,24 @@ record AuditWriter(String targetTopic,
                    KafkaProducer<byte[], byte[]> producer,
                    KafkaProducer<byte[], byte[]> producer,
                    boolean logToConsole) {
                    boolean logToConsole) {
 
 
-  static AuditWriter createAndInit(KafkaCluster c,
-                                   ReactiveAdminClient ac,
-                                   MessagesService ms) {
+  static AuditWriter createAndInit(ReactiveAdminClient adminClient,
+                                   Supplier<KafkaProducer<byte[], byte[]>> producerSupplier,
+                                   boolean topicAudit,
+                                   boolean consoleAudit,
+                                   String auditTopicName,
+                                   int auditTopicPartis,
+                                   Map<String, String> topicCreationProps) {
+    KafkaProducer<byte[], byte[]> producer;
+    if (topicAudit) {
+      producer = producerSupplier.get();
+    }
+  }
+
+
+  private void createTopic(ReactiveAdminClient adminClient,
+                   String auditTopicName,
+                   int parts,
+                   Map<String, String> topicCreationProps) {
 
 
   }
   }
 
 
@@ -37,7 +56,7 @@ record AuditWriter(String targetTopic,
 
 
 
 
   record AuditRecord(String timestamp,
   record AuditRecord(String timestamp,
-                     String userPrincipal, //TODO: discuss - rename to username?
+                     String userPrincipal,  //TODO: discuss - rename to username?
                      String clusterName,
                      String clusterName,
                      AuditResource resources,
                      AuditResource resources,
                      String operation,
                      String operation,
@@ -56,29 +75,29 @@ record AuditWriter(String targetTopic,
   }
   }
 
 
   record OperationResult(boolean success, OperationError error) {
   record OperationResult(boolean success, OperationError error) {
-  }
 
 
-  static OperationResult successResult() {
-    return new OperationResult(true, null);
-  }
+    static OperationResult successResult() {
+      return new OperationResult(true, null);
+    }
 
 
-  static OperationResult errorResult(Throwable th) {
-    OperationError err = OperationError.UNEXPECTED_ERROR;
-    if (th instanceof AccessDeniedException) {
-      err = OperationError.ACCESS_DENIED;
-    } else if (th instanceof ValidationException) {
-      err = OperationError.VALIDATION_ERROR;
-    } else if (th instanceof CustomBaseException) {
-      err = OperationError.EXECUTION_ERROR;
+    static OperationResult errorResult(Throwable th) {
+      OperationError err = OperationError.UNEXPECTED_ERROR;
+      if (th instanceof AccessDeniedException) {
+        err = OperationError.ACCESS_DENIED;
+      } else if (th instanceof ValidationException) {
+        err = OperationError.VALIDATION_ERROR;
+      } else if (th instanceof CustomBaseException) {
+        err = OperationError.EXECUTION_ERROR;
+      }
+      return new OperationResult(false, err);
     }
     }
-    return new OperationResult(false, err);
-  }
 
 
-  enum OperationError {
-    ACCESS_DENIED,
-    VALIDATION_ERROR,
-    EXECUTION_ERROR,
-    UNEXPECTED_ERROR
+    enum OperationError {
+      ACCESS_DENIED,
+      VALIDATION_ERROR,
+      EXECUTION_ERROR,
+      UNEXPECTED_ERROR
+    }
   }
   }
 
 
 }
 }