|
@@ -16,12 +16,13 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
import org.apache.kafka.common.serialization.BytesDeserializer;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
|
+import org.junit.jupiter.api.Assertions;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.junit.jupiter.api.Test;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.test.web.reactive.server.WebTestClient;
|
|
import org.springframework.test.web.reactive.server.WebTestClient;
|
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
|
import org.testcontainers.shaded.org.awaitility.Awaitility;
|
|
|
|
|
|
-public class KafkaAuditTest extends AbstractIntegrationTest {
|
|
|
|
|
|
+public class AuditIntegrationTest extends AbstractIntegrationTest {
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
private WebTestClient webTestClient;
|
|
private WebTestClient webTestClient;
|
|
@@ -47,23 +48,26 @@ public class KafkaAuditTest extends AbstractIntegrationTest {
|
|
consumer.subscribe(List.of("__kui-audit-log"));
|
|
consumer.subscribe(List.of("__kui-audit-log"));
|
|
Awaitility.await()
|
|
Awaitility.await()
|
|
.pollInSameThread()
|
|
.pollInSameThread()
|
|
- .atMost(Duration.ofSeconds(10))
|
|
|
|
|
|
+ .atMost(Duration.ofSeconds(15))
|
|
.untilAsserted(() -> {
|
|
.untilAsserted(() -> {
|
|
var polled = consumer.poll(Duration.ofSeconds(1));
|
|
var polled = consumer.poll(Duration.ofSeconds(1));
|
|
- assertThat(polled).anyMatch(rec -> {
|
|
|
|
|
|
+ assertThat(polled).anySatisfy(kafkaRecord -> {
|
|
try {
|
|
try {
|
|
- AuditRecord record = jsonMapper.readValue(rec.value(), AuditRecord.class);
|
|
|
|
- return Map.of(
|
|
|
|
- "name", newTopicName,
|
|
|
|
- "partitions", 1,
|
|
|
|
- "replicationFactor", 1,
|
|
|
|
- "configs", Map.of()
|
|
|
|
- ).equals(record.operationParams())
|
|
|
|
- && "createTopic".equals(record.operation())
|
|
|
|
- && record.resources().stream().anyMatch(r -> r.type() == Resource.TOPIC)
|
|
|
|
- && record.result().success();
|
|
|
|
|
|
+ AuditRecord record = jsonMapper.readValue(kafkaRecord.value(), AuditRecord.class);
|
|
|
|
+ assertThat(record.operation()).isEqualTo("createTopic");
|
|
|
|
+ assertThat(record.resources()).map(AuditRecord.AuditResource::type).contains(Resource.TOPIC);
|
|
|
|
+ assertThat(record.result().success()).isTrue();
|
|
|
|
+ assertThat(record.timestamp()).isNotBlank();
|
|
|
|
+ assertThat(record.clusterName()).isEqualTo(LOCAL);
|
|
|
|
+ assertThat(record.operationParams())
|
|
|
|
+ .isEqualTo(Map.of(
|
|
|
|
+ "name", newTopicName,
|
|
|
|
+ "partitions", 1,
|
|
|
|
+ "replicationFactor", 1,
|
|
|
|
+ "configs", Map.of()
|
|
|
|
+ ));
|
|
} catch (JsonProcessingException e) {
|
|
} catch (JsonProcessingException e) {
|
|
- return false;
|
|
|
|
|
|
+ Assertions.fail();
|
|
}
|
|
}
|
|
});
|
|
});
|
|
});
|
|
});
|
|
@@ -76,7 +80,7 @@ public class KafkaAuditTest extends AbstractIntegrationTest {
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
- props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaAuditTest.class.getName());
|
|
|
|
|
|
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, AuditIntegrationTest.class.getName());
|
|
return new KafkaConsumer<>(props);
|
|
return new KafkaConsumer<>(props);
|
|
}
|
|
}
|
|
|
|
|